/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.pulsar.broker.admin;

import static java.util.concurrent.TimeUnit.MINUTES;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.pulsar.broker.BrokerTestUtil.newUniqueName;
import static org.apache.pulsar.broker.resources.LoadBalanceResources.BUNDLE_DATA_BASE_PATH;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.expectThrows;
import static org.testng.Assert.fail;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.lang.reflect.Field;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import javax.ws.rs.NotAcceptableException;
import javax.ws.rs.core.Response.Status;
import lombok.AllArgsConstructor;
import lombok.Cleanup;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.admin.AdminApiTest.MockedPulsarService;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper;
import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.plugin.EntryFilter;
import org.apache.pulsar.broker.service.plugin.EntryFilter2Test;
import org.apache.pulsar.broker.service.plugin.EntryFilterDefinition;
import org.apache.pulsar.broker.service.plugin.EntryFilterProvider;
import org.apache.pulsar.broker.service.plugin.EntryFilterTest;
import org.apache.pulsar.broker.service.plugin.EntryFilterWithClassLoader;
import org.apache.pulsar.broker.testcontext.MockEntryFilterProvider;
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
import org.apache.pulsar.client.admin.ListNamespaceTopicsOptions;
import org.apache.pulsar.client.admin.Mode;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
import org.apache.pulsar.client.admin.Topics.QueryParam;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRouter;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerAccessMode;
import org.apache.pulsar.client.api.ProxyProtocol;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.TopicMetadata;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AutoFailoverPolicyData;
import org.apache.pulsar.common.policies.data.AutoFailoverPolicyType;
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationData;
import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationDataImpl;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.common.policies.data.EntryFilters;
import org.apache.pulsar.common.policies.data.FailureDomain;
import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
import org.apache.pulsar.common.policies.data.NonPersistentTopicStats;
import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.policies.data.impl.BacklogQuotaImpl;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Slf4j
@Test(groups = "broker-admin")
public class AdminApi2Test extends MockedPulsarServiceBaseTest {

    private MockedPulsarService mockPulsarSetup;
    private boolean restartClusterAfterTest;
    private int usageCount;
    private String defaultNamespace;
    private String defaultTenant;

    @BeforeClass
    @Override
    public void setup() throws Exception {
        super.internalSetup();

        // create otherbroker to test redirect on calls that need
        // namespace ownership
        mockPulsarSetup = new MockedPulsarService(this.conf);
        mockPulsarSetup.setup();

        setupClusters();
    }

    @Test
    public void testExceptionOfMaxTopicsPerNamespaceCanBeHanle() throws Exception {
        super.internalCleanup();
        conf.setMaxTopicsPerNamespace(3);
        super.internalSetup();
        String topic = "persistent://testTenant/ns1/test_create_topic_v";
        TenantInfoImpl tenantInfo = new TenantInfoImpl(Sets.newHashSet("role1", "role2"),
                Sets.newHashSet("test"));
        // check producer/consumer auto create non-partitioned topic
        conf.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
        admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(brokerUrl.toString()).build());
        admin.tenants().createTenant("testTenant", tenantInfo);
        admin.namespaces().createNamespace("testTenant/ns1", Sets.newHashSet("test"));

        pulsarClient.newProducer().topic(topic + "1").create().close();
        pulsarClient.newProducer().topic(topic + "2").create().close();
        pulsarClient.newConsumer().topic(topic + "3").subscriptionName("test_sub").subscribe().close();
        try {
            pulsarClient.newConsumer().topic(topic + "4").subscriptionName("test_sub")
                    .subscribeAsync().get(5, TimeUnit.SECONDS);
            Assert.fail();
        } catch (Exception e) {
            assertTrue(e.getCause() instanceof PulsarClientException.NotAllowedException);
        }

        // reset configuration
        conf.setMaxTopicsPerNamespace(0);
        conf.setDefaultNumPartitions(1);
    }

    @Override
    protected ServiceConfiguration getDefaultConf() {
        ServiceConfiguration conf = super.getDefaultConf();
        configureDefaults(conf);
        return conf;
    }

    void configureDefaults(ServiceConfiguration conf) {
        conf.setForceDeleteNamespaceAllowed(true);
        conf.setLoadBalancerEnabled(true);
        conf.setAllowOverrideEntryFilters(true);
        conf.setEntryFilterNames(List.of());
        conf.setMaxNumPartitionsPerPartitionedTopic(0);
    }

    @AfterClass(alwaysRun = true)
    @Override
    public void cleanup() throws Exception {
        super.internalCleanup();
        if (mockPulsarSetup != null) {
            mockPulsarSetup.cleanup();
            mockPulsarSetup = null;
        }
        resetConfig();
    }

    @AfterMethod(alwaysRun = true)
    public void resetClusters() throws Exception {
        if (restartClusterAfterTest) {
            restartClusterAndResetUsageCount();
        } else {
            try {
                cleanupCluster();
            } catch (Exception e) {
                log.error("Failed to clean up state by deleting namespaces and tenants after test. "
                        + "Restarting the test broker.", e);
                restartClusterAndResetUsageCount();
            }
        }
    }

    private void cleanupCluster() throws Exception {
        pulsar.getConfiguration().setForceDeleteTenantAllowed(true);
        pulsar.getConfiguration().setForceDeleteNamespaceAllowed(true);
        for (String tenant : admin.tenants().getTenants()) {
            for (String namespace : admin.namespaces().getNamespaces(tenant)) {
                deleteNamespaceWithRetry(namespace, true, admin, pulsar,
                        mockPulsarSetup.getPulsar());
            }
            try {
                admin.tenants().deleteTenant(tenant, true);
            } catch (Exception e) {
                log.error("Failed to delete tenant {} after test", tenant, e);
                String zkDirectory = "/managed-ledgers/" + tenant;
                try {
                    log.info("Listing {} to see if existing keys are preventing deletion.", zkDirectory);
                    pulsar.getPulsarResources().getLocalMetadataStore().get().getChildren(zkDirectory)
                            .get(5, TimeUnit.SECONDS).forEach(key -> log.info("Child key '{}'", key));
                } catch (Exception ignore) {
                    log.error("Failed to list tenant {} ZK directory {} after test", tenant, zkDirectory, e);
                }
                throw e;
            }
        }

        for (String cluster : admin.clusters().getClusters()) {
            admin.clusters().deleteCluster(cluster);
        }

        configureDefaults(conf);
        setupClusters();
    }

    private void restartClusterAfterTest() {
        restartClusterAfterTest = true;
    }

    private void restartClusterAndResetUsageCount() throws Exception {
        cleanup();
        restartClusterAfterTest = false;
        usageCount = 0;
        setup();
    }

    private void restartClusterIfReused() throws Exception {
        if (usageCount > 1) {
            restartClusterAndResetUsageCount();
        }
    }

    @BeforeMethod
    public void increaseUsageCount() {
        usageCount++;
    }

    private void setupClusters() throws PulsarAdminException {
        admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
        TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"), Set.of("test"));
        defaultTenant = newUniqueName("prop-xyz");
        admin.tenants().createTenant(defaultTenant, tenantInfo);
        defaultNamespace = defaultTenant + "/ns1";
        admin.namespaces().createNamespace(defaultNamespace, Set.of("test"));
    }

    @DataProvider(name = "topicType")
    public Object[][] topicTypeProvider() {
        return new Object[][] { { TopicDomain.persistent.value() }, { TopicDomain.non_persistent.value() } };
    }

    @DataProvider(name = "namespaceNames")
    public Object[][] namespaceNameProvider() {
        return new Object[][] { { "ns1" }, { "global" } };
    }

    @DataProvider(name = "isV1")
    public Object[][] isV1() {
        return new Object[][] { { true }, { false } };
    }


    /**
     * It verifies http error code when updating partitions to ensure compatibility.
     */
    @Test
    public void testUpdatePartitionsErrorCode() {
        final String nonPartitionedTopicName = "non-partitioned-topic-name" + UUID.randomUUID();
        try {
            // Update a non-partitioned topic
            admin.topics().updatePartitionedTopic(nonPartitionedTopicName, 2);
            Assert.fail("Expect conflict exception.");
        } catch (PulsarAdminException ex) {
            Assert.assertEquals(ex.getStatusCode(), 409 /*Conflict*/);
            Assert.assertTrue(ex instanceof PulsarAdminException.ConflictException);
        }
    }

    /**
     * <pre>
     * It verifies increasing partitions for partitioned-topic.
     * 1. create a partitioned-topic
     * 2. update partitions with larger number of partitions
     * 3. verify: getPartitionedMetadata and check number of partitions
     * 4. verify: this api creates existing subscription to new partitioned-topics
     *            so, message will not be lost in new partitions
     *  a. start producer and produce messages
     *  b. check existing subscription for new topics and it should have backlog msgs
     *
     * </pre>
     *
     * @throws Exception
     */
    @Test
    public void testIncrementPartitionsOfTopic() throws Exception {
        final String topicName = "increment-partitionedTopic";
        final String subName1 = topicName + "-my-sub-1/encode";
        final String subName2 = topicName + "-my-sub-2/encode";
        final int startPartitions = 4;
        final int newPartitions = 8;
        final String partitionedTopicName = "persistent://" + defaultNamespace + "/" + topicName;

        URL pulsarUrl = new URL(pulsar.getWebServiceAddress());

        admin.topics().createPartitionedTopic(partitionedTopicName, startPartitions);
        // validate partition topic is created
        assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions,
                startPartitions);

        // create consumer and subscriptions : check subscriptions
        @Cleanup
        PulsarClient client = PulsarClient.builder().serviceUrl(pulsarUrl.toString()).build();
        Consumer<byte[]> consumer1 = client.newConsumer().topic(partitionedTopicName).subscriptionName(subName1)
                .subscriptionType(SubscriptionType.Shared).subscribe();
        assertEquals(admin.topics().getSubscriptions(partitionedTopicName), Lists.newArrayList(subName1));
        Consumer<byte[]> consumer2 = client.newConsumer().topic(partitionedTopicName).subscriptionName(subName2)
                .subscriptionType(SubscriptionType.Shared).subscribe();
        assertEquals(new HashSet<>(admin.topics().getSubscriptions(partitionedTopicName)),
                Set.of(subName1, subName2));

        // (1) update partitions
        admin.topics().updatePartitionedTopic(partitionedTopicName, newPartitions);
        // verify new partitions have been created
        assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions,
                newPartitions);
        // (2) No Msg loss: verify new partitions have the same existing subscription names
        final String newPartitionTopicName = TopicName.get(partitionedTopicName).getPartition(startPartitions + 1)
                .toString();

        // (3) produce messages to all partitions including newly created partitions (RoundRobin)
        Producer<byte[]> producer = client.newProducer()
            .topic(partitionedTopicName)
            .enableBatching(false)
            .messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
            .create();
        final int totalMessages = newPartitions * 2;
        for (int i = 0; i < totalMessages; i++) {
            String message = "message-" + i;
            producer.send(message.getBytes());
        }

        // (4) verify existing subscription has not lost any message: create new consumer with sub-2: it will load all
        // newly created partition topics
        consumer2.close();
        consumer2 = client.newConsumer().topic(partitionedTopicName).subscriptionName(subName2)
                .subscriptionType(SubscriptionType.Shared).subscribe();
        assertEquals(new HashSet<>(admin.topics().getSubscriptions(newPartitionTopicName)),
                Set.of(subName1, subName2));

        assertEquals(new HashSet<>(admin.topics().getList(defaultNamespace)).size(), newPartitions);

        // test cumulative stats for partitioned topic
        PartitionedTopicStats topicStats = admin.topics().getPartitionedStats(partitionedTopicName, false);
        assertEquals(topicStats.getSubscriptions().keySet(), new TreeSet<>(Lists.newArrayList(subName1, subName2)));
        assertEquals(topicStats.getSubscriptions().get(subName2).getConsumers().size(), 1);
        assertEquals(topicStats.getSubscriptions().get(subName2).getMsgBacklog(), totalMessages);
        assertEquals(topicStats.getPublishers().size(), 1);
        assertEquals(topicStats.getPartitions(), new HashMap<>());

        // (5) verify: each partition should have backlog
        topicStats = admin.topics().getPartitionedStats(partitionedTopicName, true);
        assertEquals(topicStats.getMetadata().partitions, newPartitions);
        Set<String> partitionSet = new HashSet<>();
        for (int i = 0; i < newPartitions; i++) {
            partitionSet.add(partitionedTopicName + "-partition-" + i);
        }
        assertEquals(topicStats.getPartitions().keySet(), partitionSet);
        for (int i = 0; i < newPartitions; i++) {
            TopicStats partitionStats = topicStats.getPartitions()
                    .get(TopicName.get(partitionedTopicName).getPartition(i).toString());
            assertEquals(partitionStats.getPublishers().size(), 1);
            assertEquals(partitionStats.getSubscriptions().get(subName2).getConsumers().size(), 1);
            assertEquals(partitionStats.getSubscriptions().get(subName2).getMsgBacklog(), 2, 1);
        }

        producer.close();
        consumer1.close();
        consumer2.close();
        consumer2.close();
    }

    @Test
    public void testTopicPoliciesWithMultiBroker() throws Exception {
        restartClusterAfterTest();

        //setup cluster with 3 broker
        admin.clusters().updateCluster("test",
                ClusterData.builder().serviceUrl((pulsar.getWebServiceAddress() + ",localhost:1026," + "localhost:2050")).build());
        TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"), Set.of("test"));
        String tenantName = newUniqueName("prop-xyz2");
        admin.tenants().createTenant(tenantName, tenantInfo);
        admin.namespaces().createNamespace(tenantName + "/ns1", Set.of("test"));
        ServiceConfiguration config2 = super.getDefaultConf();
        @Cleanup
        PulsarTestContext pulsarTestContext2 = createAdditionalPulsarTestContext(config2);
        PulsarService pulsar2 = pulsarTestContext2.getPulsarService();
        ServiceConfiguration config3 = super.getDefaultConf();
        @Cleanup
        PulsarTestContext pulsarTestContext3 = createAdditionalPulsarTestContext(config3);
        PulsarService pulsar3 = pulsarTestContext.getPulsarService();
        @Cleanup
        PulsarAdmin admin2 = PulsarAdmin.builder().serviceHttpUrl(pulsar2.getWebServiceAddress()).build();
        @Cleanup
        PulsarAdmin admin3 = PulsarAdmin.builder().serviceHttpUrl(pulsar3.getWebServiceAddress()).build();

        //for partitioned topic, we can get topic policies from every broker
        final String topic = "persistent://" + tenantName + "/ns1/" + newUniqueName("test");
        int partitionNum = 3;
        admin.topics().createPartitionedTopic(topic, partitionNum);
        pulsarClient.newConsumer().topic(topic).subscriptionName("sub").subscribe().close();

        setTopicPoliciesAndValidate(admin2, admin3, topic);
        //for non-partitioned topic, we can get topic policies from every broker
        final String topic2 = "persistent://" + tenantName + "/ns1/" + newUniqueName("test");
        pulsarClient.newConsumer().topic(topic2).subscriptionName("sub").subscribe().close();
        setTopicPoliciesAndValidate(admin2, admin3, topic2);
    }

    private void setTopicPoliciesAndValidate(PulsarAdmin admin2
            , PulsarAdmin admin3, String topic) throws Exception {
        admin.topics().setMaxUnackedMessagesOnConsumer(topic, 100);
        Awaitility.await().untilAsserted(() -> assertNotNull(admin.topics().getMaxUnackedMessagesOnConsumer(topic)));
        admin.topics().setMaxConsumers(topic, 101);
        Awaitility.await().untilAsserted(() -> assertNotNull(admin.topics().getMaxConsumers(topic)));
        admin.topics().setMaxProducers(topic, 102);
        Awaitility.await().untilAsserted(() -> assertNotNull(admin.topics().getMaxProducers(topic)));

        assertEquals(admin.topics().getMaxUnackedMessagesOnConsumer(topic).intValue(), 100);
        assertEquals(admin2.topics().getMaxUnackedMessagesOnConsumer(topic).intValue(), 100);
        assertEquals(admin3.topics().getMaxUnackedMessagesOnConsumer(topic).intValue(), 100);
        assertEquals(admin.topics().getMaxConsumers(topic).intValue(), 101);
        assertEquals(admin2.topics().getMaxConsumers(topic).intValue(), 101);
        assertEquals(admin3.topics().getMaxConsumers(topic).intValue(), 101);
        assertEquals(admin.topics().getMaxProducers(topic).intValue(), 102);
        assertEquals(admin2.topics().getMaxProducers(topic).intValue(), 102);
        assertEquals(admin3.topics().getMaxProducers(topic).intValue(), 102);
    }

    /**
     * verifies admin api command for non-persistent topic. It verifies: partitioned-topic, stats
     *
     * @throws Exception
     */
    @Test
    public void nonPersistentTopics() throws Exception {
        final String topicName = "nonPersistentTopic";

        final String nonPersistentTopicName = "non-persistent://" + defaultNamespace + "/" + topicName;
        // Force to create a topic
        publishMessagesOnTopic(nonPersistentTopicName, 0, 0);

        // create consumer and subscription
        @Cleanup
        PulsarClient client = PulsarClient.builder()
                .serviceUrl(pulsar.getWebServiceAddress())
                .statsInterval(0, TimeUnit.SECONDS)
                .build();
        Consumer<byte[]> consumer = client.newConsumer().topic(nonPersistentTopicName).subscriptionName("my-sub")
                .subscribe();

        publishMessagesOnTopic(nonPersistentTopicName, 10, 0);

        NonPersistentTopicStats topicStats = (NonPersistentTopicStats) admin.topics().getStats(nonPersistentTopicName);
        assertEquals(topicStats.getSubscriptions().keySet(), Set.of("my-sub"));
        assertEquals(topicStats.getSubscriptions().get("my-sub").getConsumers().size(), 1);
        assertEquals(topicStats.getSubscriptions().get("my-sub").getMsgDropRate(), 0);
        assertEquals(topicStats.getPublishers().size(), 0);
        assertEquals(topicStats.getMsgDropRate(), 0);
        assertEquals(topicStats.getOwnerBroker(), pulsar.getBrokerId());

        PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(nonPersistentTopicName, false);
        assertEquals(internalStats.cursors.keySet(), Set.of("my-sub"));

        consumer.close();
        topicStats = (NonPersistentTopicStats) admin.topics().getStats(nonPersistentTopicName);
        assertFalse(topicStats.getSubscriptions().containsKey("my-sub"));
        assertEquals(topicStats.getPublishers().size(), 0);
        // test partitioned-topic
        final String partitionedTopicName = "non-persistent://" + defaultNamespace + "/paritioned";
        admin.topics().createPartitionedTopic(partitionedTopicName, 5);
        assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions, 5);
    }

    private void publishMessagesOnTopic(String topicName, int messages, int startIdx) throws Exception {
        Producer<byte[]> producer = pulsarClient.newProducer()
            .topic(topicName)
            .enableBatching(false)
            .messageRoutingMode(MessageRoutingMode.SinglePartition)
            .create();

        for (int i = startIdx; i < (messages + startIdx); i++) {
            String message = "message-" + i;
            producer.send(message.getBytes());
        }

        producer.close();
    }

    /**
     * verifies validation on persistent-policies
     *
     * @throws Exception
     */
    @Test
    public void testSetPersistencePolicies() throws Exception {

        final String namespace = newUniqueName(defaultTenant + "/ns2");
        admin.namespaces().createNamespace(namespace, Set.of("test"));

        assertNull(admin.namespaces().getPersistence(namespace));
        admin.namespaces().setPersistence(namespace, new PersistencePolicies(3, 3, 3, 10.0));
        assertEquals(admin.namespaces().getPersistence(namespace), new PersistencePolicies(3, 3, 3, 10.0));

        try {
            admin.namespaces().setPersistence(namespace, new PersistencePolicies(3, 4, 3, 10.0));
            fail("should have failed");
        } catch (PulsarAdminException e) {
            assertEquals(e.getStatusCode(), 400);
        }
        try {
            admin.namespaces().setPersistence(namespace, new PersistencePolicies(3, 3, 4, 10.0));
            fail("should have failed");
        } catch (PulsarAdminException e) {
            assertEquals(e.getStatusCode(), 400);
        }
        try {
            admin.namespaces().setPersistence(namespace, new PersistencePolicies(6, 3, 1, 10.0));
            fail("should have failed");
        } catch (PulsarAdminException e) {
            assertEquals(e.getStatusCode(), 400);
        }

        // make sure policies has not been changed
        assertEquals(admin.namespaces().getPersistence(namespace), new PersistencePolicies(3, 3, 3, 10.0));
    }

    /**
     * validates update of persistent-policies reflects on managed-ledger and managed-cursor
     *
     * @throws Exception
     */
    @Test
    public void testUpdatePersistencePolicyUpdateManagedCursor() throws Exception {

        final String namespace = newUniqueName(defaultTenant + "/ns2");
        final String topicName = "persistent://" + namespace + "/topic1";
        admin.namespaces().createNamespace(namespace, Set.of("test"));

        admin.namespaces().setPersistence(namespace, new PersistencePolicies(3, 3, 3, 50.0));
        assertEquals(admin.namespaces().getPersistence(namespace), new PersistencePolicies(3, 3, 3, 50.0));

        Producer<byte[]> producer = pulsarClient.newProducer()
            .topic(topicName)
            .enableBatching(false)
            .messageRoutingMode(MessageRoutingMode.SinglePartition)
            .create();
        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub").subscribe();

        PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();
        ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) topic.getManagedLedger();
        ManagedCursorImpl cursor = (ManagedCursorImpl) managedLedger.getCursors().iterator().next();

        final double newThrottleRate = 100;
        final int newEnsembleSize = 5;
        admin.namespaces().setPersistence(namespace, new PersistencePolicies(newEnsembleSize, 3, 3, newThrottleRate));

        retryStrategically((test) -> managedLedger.getConfig().getEnsembleSize() == newEnsembleSize
                && cursor.getThrottleMarkDelete() != newThrottleRate, 5, 200);

        // (1) verify cursor.markDelete has been updated
        assertEquals(cursor.getThrottleMarkDelete(), newThrottleRate);

        // (2) verify new ledger creation takes new config
        producer.close();
        consumer.close();
    }

    /**
     * Verify unloading topic
     *
     * @throws Exception
     */
    @Test(dataProvider = "topicType")
    public void testUnloadTopic(final String topicType) throws Exception {

        final String namespace = newUniqueName(defaultTenant + "/ns2");
        final String topicName = topicType + "://" + namespace + "/topic1";
        admin.namespaces().createNamespace(namespace, Set.of("test"));

        // create a topic by creating a producer
        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
        producer.close();

        Topic topic = pulsar.getBrokerService().getTopicIfExists(topicName).join().get();
        final boolean isPersistentTopic = topic instanceof PersistentTopic;

        // (1) unload the topic
        unloadTopic(topicName);

        // topic must be removed from map
        assertFalse(pulsar.getBrokerService().getTopicReference(topicName).isPresent());

        // recreation of producer will load the topic again
        @Cleanup
        Producer<byte[]> producer1 = pulsarClient.newProducer().topic(topicName).create();
        topic = pulsar.getBrokerService().getTopicReference(topicName).get();
        assertNotNull(topic);
        // unload the topic
        unloadTopic(topicName);
        // producer will retry and recreate the topic
        Awaitility.await().until(() -> pulsar.getBrokerService().getTopicReference(topicName).isPresent());
        // topic should be loaded by this time
        topic = pulsar.getBrokerService().getTopicReference(topicName).get();
        assertNotNull(topic);
    }

    private void unloadTopic(String topicName) throws Exception {
        admin.topics().unload(topicName);
    }

    /**
     * Verifies reset-cursor at specific position using admin-api.
     *
     * <pre>
     * 1. Publish 50 messages
     * 2. Consume 20 messages
     * 3. reset cursor position on 10th message
     * 4. consume 40 messages from reset position
     * </pre>
     *
     * @param namespaceName
     * @throws Exception
     */
    @Test(dataProvider = "namespaceNames", timeOut = 30000)
    public void testResetCursorOnPosition(String namespaceName) throws Exception {
        restartClusterAfterTest();
        final String topicName = "persistent://" + defaultTenant + "/use/" + namespaceName + "/resetPosition";
        final int totalProducedMessages = 50;

        // set retention
        admin.namespaces().setRetention(defaultNamespace, new RetentionPolicies(10, 10));

        // create consumer and subscription
        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub")
                .subscriptionType(SubscriptionType.Shared).subscribe();

        assertEquals(admin.topics().getSubscriptions(topicName), List.of("my-sub"));

        publishMessagesOnPersistentTopic(topicName, totalProducedMessages, 0);

        List<Message<byte[]>> messages = admin.topics().peekMessages(topicName, "my-sub", 10);
        assertEquals(messages.size(), 10);

        Message<byte[]> message = null;
        MessageIdImpl resetMessageId = null;
        int resetPositionId = 10;
        for (int i = 0; i < 20; i++) {
            message = consumer.receive(1, TimeUnit.SECONDS);
            consumer.acknowledge(message);
            if (i == resetPositionId) {
                resetMessageId = (MessageIdImpl) message.getMessageId();
            }
        }

        // close consumer which will clean up internal-receive-queue
        consumer.close();

        // messages should still be available due to retention
        MessageIdImpl messageId = new MessageIdImpl(
                resetMessageId.getLedgerId(),
                resetMessageId.getEntryId(),
                -1);
        // reset position at resetMessageId
        admin.topics().resetCursor(topicName, "my-sub", messageId);

        consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub")
                .subscriptionType(SubscriptionType.Shared).subscribe();
        MessageIdImpl msgId2 = (MessageIdImpl) consumer.receive(1, TimeUnit.SECONDS).getMessageId();
        assertEquals(resetMessageId, msgId2);

        int receivedAfterReset = 1; // start with 1 because we have already received 1 msg

        for (int i = 0; i < totalProducedMessages; i++) {
            message = consumer.receive(500, TimeUnit.MILLISECONDS);
            if (message == null) {
                break;
            }
            consumer.acknowledge(message);
            ++receivedAfterReset;
        }
        assertEquals(receivedAfterReset, totalProducedMessages - resetPositionId);

        // invalid topic name
        try {
            admin.topics().resetCursor(topicName + "invalid", "my-sub", messageId);
            fail("It should have failed due to invalid topic name");
        } catch (PulsarAdminException.NotFoundException e) {
            assertTrue(e.getMessage().contains(topicName));
            // Ok
        }

        // invalid cursor name
        try {
            admin.topics().resetCursor(topicName, "invalid-sub", messageId);
            fail("It should have failed due to invalid subscription name");
        } catch (PulsarAdminException.NotFoundException e) {
            assertTrue(e.getMessage().contains("invalid-sub"));
            // Ok
        }

        // invalid position
        try {
            messageId = new MessageIdImpl(0, 0, -1);
            admin.topics().resetCursor(topicName, "my-sub", messageId);
        } catch (PulsarAdminException.PreconditionFailedException e) {
            fail("It shouldn't fail for a invalid position");
        }

        consumer.close();
    }

    @Test
    public void shouldNotSupportResetOnPartitionedTopic() throws PulsarAdminException, PulsarClientException {
        final String partitionedTopicName = "persistent://" + defaultNamespace + "/" + newUniqueName("parttopic");
        admin.topics().createPartitionedTopic(partitionedTopicName, 4);
        @Cleanup
        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(partitionedTopicName).subscriptionName("my-sub")
                .subscriptionType(SubscriptionType.Shared).subscribe();
        try {
            admin.topics().resetCursor(partitionedTopicName, "my-sub", MessageId.earliest);
            fail();
        } catch (PulsarAdminException.NotAllowedException e) {
            assertTrue(e.getMessage().contains("Reset-cursor at position is not allowed for partitioned-topic"),
                    "Condition doesn't match. Actual message:" + e.getMessage());
        }
    }

    private void publishMessagesOnPersistentTopic(String topicName, int messages, int startIdx) throws Exception {
        Producer<byte[]> producer = pulsarClient.newProducer()
            .topic(topicName)
            .enableBatching(false)
            .messageRoutingMode(MessageRoutingMode.SinglePartition)
            .create();

        for (int i = startIdx; i < (messages + startIdx); i++) {
            String message = "message-" + i;
            producer.send(message.getBytes());
        }

        producer.close();
    }


    @Test(timeOut = 20000)
    public void testMaxConsumersOnSubApi() throws Exception {
        final String namespace = newUniqueName(defaultTenant + "/ns");
        admin.namespaces().createNamespace(namespace, Set.of("test"));

        assertNull(admin.namespaces().getMaxConsumersPerSubscription(namespace));
        admin.namespaces().setMaxConsumersPerSubscription(namespace, 10);
        Awaitility.await().untilAsserted(() -> {
            assertNotNull(admin.namespaces().getMaxConsumersPerSubscription(namespace));
            assertEquals(admin.namespaces().getMaxConsumersPerSubscription(namespace).intValue(), 10);
        });
        admin.namespaces().removeMaxConsumersPerSubscription(namespace);
        Awaitility.await().untilAsserted(() ->
                admin.namespaces().getMaxConsumersPerSubscription(namespace));
    }

    /**
     * It verifies that pulsar with different load-manager generates different load-report and returned by admin-api
     *
     * @throws Exception
     */
    @Test
    public void testLoadReportApi() throws Exception {
        restartClusterAfterTest();
        this.conf.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName());
        @Cleanup("cleanup")
        MockedPulsarService mockPulsarSetup1 = new MockedPulsarService(this.conf);
        mockPulsarSetup1.setup();
        PulsarAdmin simpleLoadManagerAdmin = mockPulsarSetup1.getAdmin();
        assertNotNull(simpleLoadManagerAdmin.brokerStats().getLoadReport());

        this.conf.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
        @Cleanup("cleanup")
        MockedPulsarService mockPulsarSetup2 = new MockedPulsarService(this.conf);
        mockPulsarSetup2.setup();
        PulsarAdmin modularLoadManagerAdmin = mockPulsarSetup2.getAdmin();
        assertNotNull(modularLoadManagerAdmin.brokerStats().getLoadReport());
    }

    @Test
    public void testPeerCluster() throws Exception {
        admin.clusters().createCluster("us-west1",
                ClusterData.builder().serviceUrl("http://broker.messaging.west1.example.com:8080").build());
        admin.clusters().createCluster("us-west2",
                ClusterData.builder().serviceUrl("http://broker.messaging.west2.example.com:8080").build());
        admin.clusters().createCluster("us-east1",
                ClusterData.builder().serviceUrl("http://broker.messaging.east1.example.com:8080").build());
        admin.clusters().createCluster("us-east2",
                ClusterData.builder().serviceUrl("http://broker.messaging.east2.example.com:8080").build());

        admin.clusters().updatePeerClusterNames("us-west1", new LinkedHashSet<>(List.of("us-west2")));
        assertEquals(admin.clusters().getCluster("us-west1").getPeerClusterNames(), Set.of("us-west2"));
        assertNull(admin.clusters().getCluster("us-west2").getPeerClusterNames());
        // update cluster with duplicate peer-clusters in the list
        admin.clusters().updatePeerClusterNames("us-west1",
                new LinkedHashSet<>(List.of("us-west2", "us-east1", "us-west2", "us-east1", "us-west2", "us-east1")));
        assertEquals(admin.clusters().getCluster("us-west1").getPeerClusterNames(),
                List.of("us-west2", "us-east1"));
        admin.clusters().updatePeerClusterNames("us-west1", null);
        assertNull(admin.clusters().getCluster("us-west1").getPeerClusterNames());

        // Check name validation
        try {
            admin.clusters().updatePeerClusterNames("us-west1",
                    new LinkedHashSet<>(List.of("invalid-cluster")));
            fail("should have failed");
        } catch (PulsarAdminException e) {
            assertTrue(e instanceof PreconditionFailedException);
        }

        // Cluster itself can't be part of peer-list
        try {
            admin.clusters().updatePeerClusterNames("us-west1", new LinkedHashSet<>(List.of("us-west1")));
            fail("should have failed");
        } catch (PulsarAdminException e) {
            assertTrue(e instanceof PreconditionFailedException);
        }
    }

    /**
     * It validates that peer-cluster can't coexist in replication-cluster list
     *
     * @throws Exception
     */
    @Test
    public void testReplicationPeerCluster() throws Exception {
        restartClusterAfterTest();

        admin.clusters().createCluster("us-west1",
                ClusterData.builder().serviceUrl("http://broker.messaging.west1.example.com:8080").build());
        admin.clusters().createCluster("us-west2",
                ClusterData.builder().serviceUrl("http://broker.messaging.west2.example.com:8080").build());
        admin.clusters().createCluster("us-west3",
                ClusterData.builder().serviceUrl("http://broker.messaging.west2.example.com:8080").build());
        admin.clusters().createCluster("us-west4",
                ClusterData.builder().serviceUrl("http://broker.messaging.west2.example.com:8080").build());
        admin.clusters().createCluster("us-east1",
                ClusterData.builder().serviceUrl("http://broker.messaging.east1.example.com:8080").build());
        admin.clusters().createCluster("us-east2",
                ClusterData.builder().serviceUrl("http://broker.messaging.east2.example.com:8080").build());
        admin.clusters().createCluster("global", ClusterData.builder().build());

        List<String> allClusters = admin.clusters().getClusters();
        Collections.sort(allClusters);
        assertEquals(allClusters,
                List.of("test", "us-east1", "us-east2", "us-west1", "us-west2", "us-west3", "us-west4"));

        final String property = newUniqueName("peer-prop");
        Set<String> allowedClusters = Set.of("us-west1", "us-west2", "us-west3", "us-west4", "us-east1",
                "us-east2", "global");
        TenantInfoImpl propConfig = new TenantInfoImpl(Set.of("test"), allowedClusters);
        admin.tenants().createTenant(property, propConfig);

        final String namespace = property + "/global/conflictPeer";
        admin.namespaces().createNamespace(namespace);

        admin.clusters().updatePeerClusterNames("us-west1",
                new LinkedHashSet<>(List.of("us-west2", "us-west3")));
        assertEquals(admin.clusters().getCluster("us-west1").getPeerClusterNames(),
                List.of("us-west2", "us-west3"));

        // (1) no conflicting peer
        Set<String> clusterIds = Set.of("us-east1", "us-east2");
        admin.namespaces().setNamespaceReplicationClusters(namespace, clusterIds);

        // (2) conflicting peer
        clusterIds = Set.of("us-west2", "us-west3", "us-west1");
        try {
            admin.namespaces().setNamespaceReplicationClusters(namespace, clusterIds);
            fail("Peer-cluster can't coexist in replication cluster list");
        } catch (PulsarAdminException.ConflictException e) {
            // Ok
        }

        clusterIds = Set.of("us-west2", "us-west3");
        // no peer coexist in replication clusters
        admin.namespaces().setNamespaceReplicationClusters(namespace, clusterIds);

        clusterIds = Set.of("us-west1", "us-west4");
        // no peer coexist in replication clusters
        admin.namespaces().setNamespaceReplicationClusters(namespace, clusterIds);
    }

    @Test
    public void clusterFailureDomain() throws PulsarAdminException {

        final String cluster = pulsar.getConfiguration().getClusterName();
        // create
        FailureDomain domain = FailureDomain.builder()
                .brokers(Set.of("b1", "b2", "b3"))
                .build();
        admin.clusters().createFailureDomain(cluster, "domain-1", domain);
        admin.clusters().updateFailureDomain(cluster, "domain-1", domain);

        assertEquals(admin.clusters().getFailureDomain(cluster, "domain-1"), domain);

        Map<String, FailureDomain> domains = admin.clusters().getFailureDomains(cluster);
        assertEquals(domains.size(), 1);
        assertTrue(domains.containsKey("domain-1"));

        try {
            // try to create domain with already registered brokers
            admin.clusters().createFailureDomain(cluster, "domain-2", domain);
            fail("should have failed because of brokers are already registered");
        } catch (PulsarAdminException.ConflictException e) {
            // Ok
        }

        admin.clusters().deleteFailureDomain(cluster, "domain-1");
        assertTrue(admin.clusters().getFailureDomains(cluster).isEmpty());

        admin.clusters().createFailureDomain(cluster, "domain-2", domain);
        domains = admin.clusters().getFailureDomains(cluster);
        assertEquals(domains.size(), 1);
        assertTrue(domains.containsKey("domain-2"));
    }

    @Test
    public void namespaceAntiAffinity() throws PulsarAdminException {
        final String namespace = defaultNamespace;
        final String antiAffinityGroup = "group";
        assertTrue(isBlank(admin.namespaces().getNamespaceAntiAffinityGroup(namespace)));
        admin.namespaces().setNamespaceAntiAffinityGroup(namespace, antiAffinityGroup);
        assertEquals(admin.namespaces().getNamespaceAntiAffinityGroup(namespace), antiAffinityGroup);
        admin.namespaces().deleteNamespaceAntiAffinityGroup(namespace);
        assertTrue(isBlank(admin.namespaces().getNamespaceAntiAffinityGroup(namespace)));

        final String ns1 = defaultTenant + "/antiAG1";
        final String ns2 = defaultTenant + "/antiAG2";
        final String ns3 = defaultTenant + "/antiAG3";
        admin.namespaces().createNamespace(ns1, Set.of("test"));
        admin.namespaces().createNamespace(ns2, Set.of("test"));
        admin.namespaces().createNamespace(ns3, Set.of("test"));
        admin.namespaces().setNamespaceAntiAffinityGroup(ns1, antiAffinityGroup);
        admin.namespaces().setNamespaceAntiAffinityGroup(ns2, antiAffinityGroup);
        admin.namespaces().setNamespaceAntiAffinityGroup(ns3, antiAffinityGroup);

        Set<String> namespaces = new HashSet<>(
                admin.namespaces().getAntiAffinityNamespaces(defaultTenant, "test", antiAffinityGroup));
        assertEquals(namespaces.size(), 3);
        assertTrue(namespaces.contains(ns1));
        assertTrue(namespaces.contains(ns2));
        assertTrue(namespaces.contains(ns3));

        List<String> namespaces2 = admin.namespaces().getAntiAffinityNamespaces(defaultTenant, "test", "invalid-group");
        assertEquals(namespaces2.size(), 0);
    }

    @Test
    public void testPersistentTopicList() throws Exception {
        final String namespace = newUniqueName(defaultTenant + "/ns2");
        final String topicName = "non-persistent://" + namespace + "/bundle-topic";
        admin.namespaces().createNamespace(namespace, 20);
        admin.namespaces().setNamespaceReplicationClusters(namespace, Set.of("test"));
        int totalTopics = 100;

        Set<String> topicNames = new HashSet<>();
        for (int i = 0; i < totalTopics; i++) {
            topicNames.add(topicName + i);
            Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName + i).create();
            producer.close();
        }

        Set<String> topics = new HashSet<>();
        String bundle = pulsar.getNamespaceService().getNamespaceBundleFactory()
                .getBundle(TopicName.get(topicName + "0")).getBundleRange();
        for (int i = 0; i < totalTopics; i++) {
            Topic topic = pulsar.getBrokerService().getTopicReference(topicName + i).get();
            if (bundle.equals(pulsar.getNamespaceService().getNamespaceBundleFactory()
                    .getBundle(TopicName.get(topicName + i)).getBundleRange())) {
                topics.add(topic.getName());
            }
        }

        Set<String> topicsInNs = Sets
                .newHashSet(
                        admin.topics().getList(namespace, null, Collections.singletonMap(QueryParam.Bundle, bundle)));
        assertEquals(topicsInNs.size(), topics.size());
        topicsInNs.removeAll(topics);
        assertEquals(topicsInNs.size(), 0);
    }

    @Test
    public void testCreateAndGetTopicProperties() throws Exception {
        final String namespace = newUniqueName(defaultTenant + "/ns2");
        final String nonPartitionedTopicName = "persistent://" + namespace + "/non-partitioned-TopicProperties";
        admin.namespaces().createNamespace(namespace, 20);
        Map<String, String> nonPartitionedTopicProperties = new HashMap<>();
        nonPartitionedTopicProperties.put("key1", "value1");
        admin.topics().createNonPartitionedTopic(nonPartitionedTopicName, nonPartitionedTopicProperties);
        Map<String, String> properties11 = admin.topics().getProperties(nonPartitionedTopicName);
        Assert.assertNotNull(properties11);
        Assert.assertEquals(properties11.get("key1"), "value1");

        final String partitionedTopicName = "persistent://" + namespace + "/partitioned-TopicProperties";
        Map<String, String> partitionedTopicProperties = new HashMap<>();
        partitionedTopicProperties.put("key2", "value2");
        admin.topics().createPartitionedTopic(partitionedTopicName, 2, partitionedTopicProperties);
        Map<String, String> properties22 = admin.topics().getProperties(partitionedTopicName);
        Assert.assertNotNull(properties22);
        Assert.assertEquals(properties22.get("key2"), "value2");
    }

    @Test
    public void testUpdatePartitionedTopicProperties() throws Exception {
        final String namespace = newUniqueName(defaultTenant + "/ns2");
        final String topicName = "persistent://" + namespace + "/testUpdatePartitionedTopicProperties";
        final String topicNameTwo = "persistent://" + namespace + "/testUpdatePartitionedTopicProperties2";
        admin.namespaces().createNamespace(namespace, 20);

        // create partitioned topic without properties
        admin.topics().createPartitionedTopic(topicName, 2);
        Map<String, String> properties = admin.topics().getProperties(topicName);
        Assert.assertNull(properties);
        Map<String, String> topicProperties = new HashMap<>();
        topicProperties.put("key1", "value1");
        admin.topics().updateProperties(topicName, topicProperties);
        properties = admin.topics().getProperties(topicName);
        Assert.assertNotNull(properties);
        Assert.assertEquals(properties.get("key1"), "value1");

        // update with new key, old properties should keep
        topicProperties = new HashMap<>();
        topicProperties.put("key2", "value2");
        admin.topics().updateProperties(topicName, topicProperties);
        properties = admin.topics().getProperties(topicName);
        Assert.assertNotNull(properties);
        Assert.assertEquals(properties.size(), 2);
        Assert.assertEquals(properties.get("key1"), "value1");
        Assert.assertEquals(properties.get("key2"), "value2");

        // override old values
        topicProperties = new HashMap<>();
        topicProperties.put("key1", "value11");
        admin.topics().updateProperties(topicName, topicProperties);
        properties = admin.topics().getProperties(topicName);
        Assert.assertNotNull(properties);
        Assert.assertEquals(properties.size(), 2);
        Assert.assertEquals(properties.get("key1"), "value11");
        Assert.assertEquals(properties.get("key2"), "value2");

        // create topic without properties
        admin.topics().createPartitionedTopic(topicNameTwo, 2);
        properties = admin.topics().getProperties(topicNameTwo);
        Assert.assertNull(properties);
        // remove key of properties on this topic
        admin.topics().removeProperties(topicNameTwo, "key1");
        properties = admin.topics().getProperties(topicNameTwo);
        Assert.assertNull(properties);
        Map<String, String> topicProp = new HashMap<>();
        topicProp.put("key1", "value1");
        topicProp.put("key2", "value2");
        admin.topics().updateProperties(topicNameTwo, topicProp);
        properties = admin.topics().getProperties(topicNameTwo);
        Assert.assertEquals(properties, topicProp);
        admin.topics().removeProperties(topicNameTwo, "key1");
        topicProp.remove("key1");
        properties = admin.topics().getProperties(topicNameTwo);
        Assert.assertEquals(properties, topicProp);
    }

    @Test
    public void testUpdateNonPartitionedTopicProperties() throws Exception {
        final String namespace = newUniqueName(defaultTenant + "/ns2");
        final String topicName = "persistent://" + namespace + "/testUpdateNonPartitionedTopicProperties";
        admin.namespaces().createNamespace(namespace, 20);

        // create non-partitioned topic with properties
        Map<String, String> topicProperties = new HashMap<>();
        topicProperties.put("key1", "value1");
        admin.topics().createNonPartitionedTopic(topicName, topicProperties);
        Map<String, String> properties = admin.topics().getProperties(topicName);
        Assert.assertNotNull(properties);
        Assert.assertEquals(properties.get("key1"), "value1");

        // update with new key, old properties should keep
        topicProperties = new HashMap<>();
        topicProperties.put("key2", "value2");
        admin.topics().updateProperties(topicName, topicProperties);
        properties = admin.topics().getProperties(topicName);
        Assert.assertNotNull(properties);
        Assert.assertEquals(properties.size(), 2);
        Assert.assertEquals(properties.get("key1"), "value1");
        Assert.assertEquals(properties.get("key2"), "value2");

        // override old values
        topicProperties = new HashMap<>();
        topicProperties.put("key1", "value11");
        admin.topics().updateProperties(topicName, topicProperties);
        properties = admin.topics().getProperties(topicName);
        Assert.assertNotNull(properties);
        Assert.assertEquals(properties.size(), 2);
        Assert.assertEquals(properties.get("key1"), "value11");
        Assert.assertEquals(properties.get("key2"), "value2");
    }

    @Test
    public void testNonPersistentTopics() throws Exception {
        final String namespace = newUniqueName(defaultTenant + "/ns2");
        final String topicName = "non-persistent://" + namespace + "/topic";
        admin.namespaces().createNamespace(namespace, 20);
        admin.namespaces().setNamespaceReplicationClusters(namespace, Set.of("test"));
        int totalTopics = 100;

        Set<String> topicNames = new HashSet<>();
        for (int i = 0; i < totalTopics; i++) {
            topicNames.add(topicName + i);
            Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName + i).create();
            producer.close();
        }

        for (int i = 0; i < totalTopics; i++) {
            Topic topic = pulsar.getBrokerService().getTopicReference(topicName + i).get();
            assertNotNull(topic);
        }

        Set<String> topicsInNs = new HashSet<>(admin.topics().getList(namespace));
        assertEquals(topicsInNs.size(), totalTopics);
        topicsInNs.removeAll(topicNames);
        assertEquals(topicsInNs.size(), 0);
    }

    @Test
    public void testPublishConsumerStats() throws Exception {
        final String topicName = "statTopic";
        final String subscriberName = topicName + "-my-sub-1";
        final String topic = "persistent://" + defaultNamespace + "/" + topicName;
        final String producerName = "myProducer";

        @Cleanup
        PulsarClient client = PulsarClient.builder().serviceUrl(pulsar.getWebServiceAddress()).build();
        Consumer<byte[]> consumer = client.newConsumer().topic(topic).subscriptionName(subscriberName)
                .subscriptionType(SubscriptionType.Shared).subscribe();
        Producer<byte[]> producer = client.newProducer()
            .topic(topic)
            .enableBatching(false)
            .messageRoutingMode(MessageRoutingMode.SinglePartition)
            .producerName(producerName)
            .create();

        retryStrategically((test) -> {
            TopicStats stats;
            try {
                stats = admin.topics().getStats(topic);
                return stats.getPublishers().size() > 0 && stats.getSubscriptions().get(subscriberName) != null
                        && stats.getSubscriptions().get(subscriberName).getConsumers().size() > 0;
            } catch (PulsarAdminException e) {
                return false;
            }
        }, 5, 200);

        TopicStats topicStats = admin.topics().getStats(topic);
        assertEquals(topicStats.getPublishers().size(), 1);
        assertNotNull(topicStats.getPublishers().get(0).getAddress());
        assertNotNull(topicStats.getPublishers().get(0).getClientVersion());
        assertNotNull(topicStats.getPublishers().get(0).getConnectedSince());
        assertNotNull(topicStats.getPublishers().get(0).getProducerName());
        assertEquals(topicStats.getPublishers().get(0).getProducerName(), producerName);

        SubscriptionStats subscriber = topicStats.getSubscriptions().get(subscriberName);
        assertNotNull(subscriber);
        assertEquals(subscriber.getConsumers().size(), 1);
        ConsumerStats consumerStats = subscriber.getConsumers().get(0);
        assertNotNull(consumerStats.getAddress());
        assertNotNull(consumerStats.getClientVersion());
        assertNotNull(consumerStats.getConnectedSince());

        producer.close();
        consumer.close();
    }

    @Test
    public void testTenantNameWithUnderscore() throws Exception {
        restartClusterAfterTest();

        TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"), Set.of("test"));
        admin.tenants().createTenant("prop_xyz", tenantInfo);

        admin.namespaces().createNamespace("prop_xyz/my-namespace", Set.of("test"));

        String topic = "persistent://prop_xyz/use/my-namespace/my-topic";

        @Cleanup
        Producer<byte[]> producer = pulsarClient.newProducer().topic(topic)
                .create();

        TopicStats stats = admin.topics().getStats(topic);
        assertEquals(stats.getPublishers().size(), 1);
    }

    @Test
    public void testTenantNameWithInvalidCharacters() {
        TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"), Set.of("test"));

        // If we try to create property with invalid characters, it should fail immediately
        try {
            admin.tenants().createTenant("prop xyz", tenantInfo);
            fail("Should have failed");
        } catch (PulsarAdminException e) {
            assertEquals(e.getStatusCode(), Status.PRECONDITION_FAILED.getStatusCode());
        }

        try {
            admin.tenants().createTenant("prop&xyz", tenantInfo);
            fail("Should have failed");
        } catch (PulsarAdminException e) {
            assertEquals(e.getStatusCode(), Status.PRECONDITION_FAILED.getStatusCode());
        }
    }

    @Test
    public void testTenantWithNonexistentClusters() throws Exception {
        // Check non-existing cluster
        assertFalse(admin.clusters().getClusters().contains("cluster-non-existing"));

        Set<String> allowedClusters = Set.of("cluster-non-existing");
        TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"), allowedClusters);

        // If we try to create tenant with nonexistent clusters, it should fail immediately
        try {
            admin.tenants().createTenant("test-tenant", tenantInfo);
            fail("Should have failed");
        } catch (PulsarAdminException e) {
            assertEquals(e.getStatusCode(), Status.PRECONDITION_FAILED.getStatusCode());
        }

        assertFalse(admin.tenants().getTenants().contains("test-tenant"));

        // Check existing tenant
        assertTrue(admin.tenants().getTenants().contains(defaultTenant));

        // If we try to update existing tenant with nonexistent clusters, it should fail immediately
        try {
            admin.tenants().updateTenant(defaultTenant, tenantInfo);
            fail("Should have failed");
        } catch (PulsarAdminException e) {
            assertEquals(e.getStatusCode(), Status.PRECONDITION_FAILED.getStatusCode());
        }
    }

    @Test
    public void brokerNamespaceIsolationPolicies() throws Exception {

        // create
        String policyName1 = "policy-1";
        String cluster = pulsar.getConfiguration().getClusterName();
        String namespaceRegex = "other/" + cluster + "/other.*";
        String brokerName = pulsar.getAdvertisedAddress();
        String brokerAddress = pulsar.getBrokerId();

        Map<String, String> parameters1 = new HashMap<>();
        parameters1.put("min_limit", "1");
        parameters1.put("usage_threshold", "100");

        NamespaceIsolationData nsPolicyData1 = NamespaceIsolationData.builder()
                .namespaces(Collections.singletonList(namespaceRegex))
                .primary(Collections.singletonList(brokerName))
                .secondary(Collections.singletonList(brokerName + ".*"))
                .autoFailoverPolicy(AutoFailoverPolicyData.builder()
                        .policyType(AutoFailoverPolicyType.min_available)
                        .parameters(parameters1)
                        .build())
                .build();
        admin.clusters().createNamespaceIsolationPolicy(cluster, policyName1, nsPolicyData1);

        List<BrokerNamespaceIsolationData> brokerIsolationDataList = admin.clusters()
                .getBrokersWithNamespaceIsolationPolicy(cluster);
        assertEquals(brokerIsolationDataList.size(), 1);
        assertEquals(brokerIsolationDataList.get(0).getBrokerName(), brokerAddress);
        assertEquals(brokerIsolationDataList.get(0).getNamespaceRegex().size(), 1);
        assertEquals(brokerIsolationDataList.get(0).getNamespaceRegex().get(0), namespaceRegex);
        assertEquals(brokerIsolationDataList.get(0).getPolicyName(), policyName1);

        BrokerNamespaceIsolationDataImpl brokerIsolationData = (BrokerNamespaceIsolationDataImpl) admin.clusters()
                .getBrokerWithNamespaceIsolationPolicy(cluster, brokerAddress);
        assertEquals(brokerIsolationData.getBrokerName(), brokerAddress);
        assertEquals(brokerIsolationData.getNamespaceRegex().size(), 1);
        assertEquals(brokerIsolationData.getNamespaceRegex().get(0), namespaceRegex);

        BrokerNamespaceIsolationDataImpl isolationData = (BrokerNamespaceIsolationDataImpl) admin.clusters()
                .getBrokerWithNamespaceIsolationPolicy(cluster, "invalid-broker");
        assertFalse(isolationData.isPrimary());

        admin.clusters().deleteNamespaceIsolationPolicy(cluster, policyName1);
    }

    // create 1 namespace:
    //  0. without isolation policy configured, lookup will success.
    //  1. with matched isolation broker configured and matched, lookup will success.
    //  2. update isolation policy, without broker matched, lookup will fail.
    @Test
    public void brokerNamespaceIsolationPoliciesUpdateOnTime() throws Exception {
        String brokerName = pulsar.getAdvertisedAddress();
        String ns1Name = defaultTenant + "/test_ns1_iso_" + System.currentTimeMillis();
        admin.namespaces().createNamespace(ns1Name, Set.of("test"));

        //  0. without isolation policy configured, lookup will success.
        String brokerUrl = admin.lookups().lookupTopic(ns1Name + "/topic1");
        assertTrue(brokerUrl.contains(brokerName));
        log.info("0 get lookup url {}", brokerUrl);

        // create
        String policyName1 = "policy-1";
        String cluster = pulsar.getConfiguration().getClusterName();

        Map<String, String> parameters1 = new HashMap<>();
        parameters1.put("min_limit", "1");
        parameters1.put("usage_threshold", "100");

        final List<String> primaryList = new ArrayList<>();
        primaryList.add(brokerName + ".*");
        NamespaceIsolationData nsPolicyData1 = NamespaceIsolationData.builder()
                .namespaces(Collections.singletonList(ns1Name))
                .primary(primaryList)
                .autoFailoverPolicy(AutoFailoverPolicyData.builder()
                        .policyType(AutoFailoverPolicyType.min_available)
                        .parameters(parameters1)
                        .build())
                .build();
        admin.clusters().createNamespaceIsolationPolicyAsync(cluster, policyName1, nsPolicyData1).get();

        //  1. with matched isolation broker configured and matched, lookup will success.
        brokerUrl = admin.lookups().lookupTopic(ns1Name + "/topic2");
        assertTrue(brokerUrl.contains(brokerName));
        log.info(" 1 get lookup url {}", brokerUrl);

        //  2. update isolation policy, without broker matched, lookup will fail.
        nsPolicyData1.getPrimary().clear();
        nsPolicyData1.getPrimary().add(brokerName + "not_match");
        admin.clusters().updateNamespaceIsolationPolicyAsync(cluster, policyName1, nsPolicyData1).get();

        try {
            admin.lookups().lookupTopic(ns1Name + "/topic3");
            fail();
        } catch (Exception e) {
            // expected lookup fail, because no brokers matched the policy.
            log.info(" 2 expected fail lookup");
        }

        try {
            admin.lookups().lookupTopic(ns1Name + "/topic1");
            fail();
        } catch (Exception e) {
            // expected lookup fail, because no brokers matched the policy.
            log.info(" 22 expected fail lookup");
        }

        admin.clusters().deleteNamespaceIsolationPolicy(cluster, policyName1);
    }

    @Test
    public void clustersList() throws PulsarAdminException {
        final String cluster = pulsar.getConfiguration().getClusterName();
        admin.clusters().createCluster("global", ClusterData.builder()
                .serviceUrl("http://localhost:6650").build());

        // Global cluster, if there, should be omitted from the results
        assertEquals(admin.clusters().getClusters(), List.of(cluster));
    }
    /**
     * verifies cluster has been set before create topic
     *
     * @throws PulsarAdminException
     */
    @Test
    public void testClusterIsReadyBeforeCreateTopic() throws Exception {
        restartClusterAfterTest();
        final String topicName = "partitionedTopic";
        final int partitions = 4;
        final String persistentPartitionedTopicName = "persistent://" + defaultTenant + "/ns2/" + topicName;
        final String NonPersistentPartitionedTopicName = "non-persistent://" + defaultTenant + "/ns2/" + topicName;

        admin.namespaces().createNamespace(defaultTenant + "/ns2");
        // By default the cluster will configure as configuration file. So the create topic operation
        // will never throw exception except there is no cluster.
        admin.namespaces().setNamespaceReplicationClusters(defaultTenant + "/ns2", Sets.newHashSet(configClusterName));

        admin.topics().createPartitionedTopic(persistentPartitionedTopicName, partitions);
        admin.topics().createPartitionedTopic(NonPersistentPartitionedTopicName, partitions);
    }

    @Test
    public void testCreateNamespaceWithNoClusters() throws PulsarAdminException {
        String localCluster = pulsar.getConfiguration().getClusterName();
        String namespace = newUniqueName(defaultTenant + "/test-ns-with-no-clusters");
        admin.namespaces().createNamespace(namespace);

        // Global cluster, if there, should be omitted from the results
        assertEquals(admin.namespaces().getNamespaceReplicationClusters(namespace),
                Collections.singletonList(localCluster));
    }

    @Test(timeOut = 30000)
    public void testConsumerStatsLastTimestamp() throws PulsarClientException, PulsarAdminException, InterruptedException {
        long timestamp = System.currentTimeMillis();
        final String topicName = "consumer-stats-" + timestamp;
        final String subscribeName = topicName + "-test-stats-sub";
        final String topic = "persistent://" + defaultNamespace + "/" + topicName;
        final String producerName = "producer-" + topicName;

        @Cleanup
        PulsarClient client = PulsarClient.builder().serviceUrl(pulsar.getWebServiceAddress()).build();
        Producer<byte[]> producer = client.newProducer().topic(topic)
            .enableBatching(false)
            .producerName(producerName)
            .create();

        // a. Send a message to the topic.
        producer.send("message-1".getBytes(StandardCharsets.UTF_8));

        // b. Create a consumer, because there was a message in the topic, the consumer will receive the message pushed
        // by the broker, the lastConsumedTimestamp will as the consume subscribe time.
        Consumer<byte[]> consumer = client.newConsumer().topic(topic)
            .subscriptionName(subscribeName)
            .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
            .acknowledgmentGroupTime(0, TimeUnit.SECONDS)
            .subscribe();
        Message<byte[]> message = consumer.receive();

        // Get the consumer stats.
        TopicStats topicStats = admin.topics().getStats(topic);
        SubscriptionStats subscriptionStats = topicStats.getSubscriptions().get(subscribeName);
        long startConsumedFlowTimestamp = subscriptionStats.getLastConsumedFlowTimestamp();
        long startAckedTimestampInSubStats = subscriptionStats.getLastAckedTimestamp();
        ConsumerStats consumerStats = subscriptionStats.getConsumers().get(0);
        long startConsumedTimestampInConsumerStats = consumerStats.getLastConsumedTimestamp();
        long startAckedTimestampInConsumerStats = consumerStats.getLastAckedTimestamp();

        // Because the message was pushed by the broker, the consumedTimestamp should not as 0.
        assertNotEquals(0, startConsumedTimestampInConsumerStats);
        // There is no consumer ack the message, so the lastAckedTimestamp still as 0.
        assertEquals(0, startAckedTimestampInConsumerStats);
        assertNotEquals(0, startConsumedFlowTimestamp);
        assertEquals(0, startAckedTimestampInSubStats);

        // c. The Consumer receives the message and acks the message.
        consumer.acknowledge(message);
        // Waiting for the ack command send to the broker.
        while (true) {
            topicStats = admin.topics().getStats(topic);
            if (topicStats.getSubscriptions().get(subscribeName).getLastAckedTimestamp() != 0) {
                break;
            }
            TimeUnit.MILLISECONDS.sleep(100);
        }

        // Get the consumer stats.
        topicStats = admin.topics().getStats(topic);
        subscriptionStats = topicStats.getSubscriptions().get(subscribeName);
        long consumedFlowTimestamp = subscriptionStats.getLastConsumedFlowTimestamp();
        long ackedTimestampInSubStats = subscriptionStats.getLastAckedTimestamp();
        consumerStats = subscriptionStats.getConsumers().get(0);
        long consumedTimestamp = consumerStats.getLastConsumedTimestamp();
        long ackedTimestamp = consumerStats.getLastAckedTimestamp();

        // The lastConsumedTimestamp should same as the last time because the broker does not push any messages and the
        // consumer does not pull any messages.
        assertEquals(startConsumedTimestampInConsumerStats, consumedTimestamp);
        assertTrue(startAckedTimestampInConsumerStats < ackedTimestamp);
        assertNotEquals(0, consumedFlowTimestamp);
        assertTrue(startAckedTimestampInSubStats < ackedTimestampInSubStats);

        // d. Send another messages. The lastConsumedTimestamp should be updated.
        producer.send("message-2".getBytes(StandardCharsets.UTF_8));

        // e. Receive the message and ack it.
        message = consumer.receive();
        consumer.acknowledge(message);
        // Waiting for the ack command send to the broker.
        while (true) {
            topicStats = admin.topics().getStats(topic);
            if (topicStats.getSubscriptions().get(subscribeName).getLastAckedTimestamp() != ackedTimestampInSubStats) {
                break;
            }
            TimeUnit.MILLISECONDS.sleep(100);
        }

        // Get the consumer stats again.
        topicStats = admin.topics().getStats(topic);
        subscriptionStats = topicStats.getSubscriptions().get(subscribeName);
        long lastConsumedFlowTimestamp = subscriptionStats.getLastConsumedFlowTimestamp();
        long lastConsumedTimestampInSubStats = subscriptionStats.getLastConsumedTimestamp();
        long lastAckedTimestampInSubStats = subscriptionStats.getLastAckedTimestamp();
        consumerStats = subscriptionStats.getConsumers().get(0);
        long lastConsumedTimestamp = consumerStats.getLastConsumedTimestamp();
        long lastAckedTimestamp = consumerStats.getLastAckedTimestamp();

        assertTrue(consumedTimestamp < lastConsumedTimestamp);
        assertTrue(ackedTimestamp < lastAckedTimestamp);
        assertTrue(startConsumedTimestampInConsumerStats < lastConsumedTimestamp);
        assertEquals(lastConsumedFlowTimestamp, consumedFlowTimestamp);
        assertTrue(ackedTimestampInSubStats < lastAckedTimestampInSubStats);
        assertEquals(lastConsumedTimestamp, lastConsumedTimestampInSubStats);

        consumer.close();
        producer.close();
    }

    @Test(timeOut = 30000)
    public void testPreciseBacklog() throws Exception {
        restartClusterIfReused();

        final String topic = "persistent://" + defaultNamespace + "/precise-back-log";
        final String subName = "sub-name";

        @Cleanup
        PulsarClient client = PulsarClient.builder().serviceUrl(pulsar.getWebServiceAddress()).build();

        @Cleanup
        Consumer<byte[]> consumer = client.newConsumer()
            .topic(topic)
            .subscriptionName(subName)
            .acknowledgmentGroupTime(0, TimeUnit.SECONDS)
            .subscribe();

        @Cleanup
        Producer<byte[]> producer = client.newProducer()
            .topic(topic)
            .enableBatching(false)
            .create();

        producer.send("message-1".getBytes(StandardCharsets.UTF_8));
        Message<byte[]> message = consumer.receive();
        assertNotNull(message);

        // Mock the entries added count. Default is disable the precise backlog, so the backlog is entries added count - consumed count
        // Since message have not acked, so the backlog is 10
        PersistentSubscription subscription = (PersistentSubscription)pulsar.getBrokerService().getTopicReference(topic).get().getSubscription(subName);
        assertNotNull(subscription);
        ((ManagedLedgerImpl)subscription.getCursor().getManagedLedger()).setEntriesAddedCounter(10L);
        TopicStats topicStats = admin.topics().getStats(topic);
        assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklog(), 10);

        topicStats = admin.topics().getStats(topic, true, true);
        assertEquals(topicStats.getSubscriptions().get(subName).getBacklogSize(), 40);
        assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklog(), 1);
        consumer.acknowledge(message);

        // wait for ack send
        Awaitility.await().untilAsserted(() -> {
            // Consumer acks the message, so the precise backlog is 0
            TopicStats topicStats2 = admin.topics().getStats(topic, true, true);
            assertEquals(topicStats2.getSubscriptions().get(subName).getBacklogSize(), 0);
            assertEquals(topicStats2.getSubscriptions().get(subName).getMsgBacklog(), 0);
        });

        topicStats = admin.topics().getStats(topic);
        assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklog(), 9);
    }

    @Test
    public void testDeleteTenant() throws Exception {
        restartClusterAfterTest();
        // Disabled conf: systemTopicEnabled. see: https://github.com/apache/pulsar/pull/17070
        boolean originalSystemTopicEnabled = conf.isSystemTopicEnabled();
        if (originalSystemTopicEnabled) {
            cleanup();
            conf.setSystemTopicEnabled(false);
            setup();
        }
        pulsar.getConfiguration().setForceDeleteNamespaceAllowed(false);

        String tenant = newUniqueName("test-tenant-1");
        assertFalse(admin.tenants().getTenants().contains(tenant));

        // create tenant
        admin.tenants().createTenant(tenant,
                new TenantInfoImpl(Set.of("role1", "role2"), Set.of("test")));
        assertTrue(admin.tenants().getTenants().contains(tenant));

        // create namespace
        String namespace = tenant + "/test-ns-1";
        admin.namespaces().createNamespace(namespace, Set.of("test"));
        assertEquals(admin.namespaces().getNamespaces(tenant), List.of(namespace));

        // create topic
        String topic = namespace + "/test-topic-1";
        admin.topics().createPartitionedTopic(topic, 10);
        assertFalse(admin.topics().getList(namespace).isEmpty());

        try {
            admin.namespaces().deleteNamespace(namespace, false);
            fail("should have failed due to namespace not empty");
        } catch (PulsarAdminException e) {
            // Expected: cannot delete non-empty tenant
        }

        // delete topic
        admin.topics().deletePartitionedTopic(topic);
        assertTrue(admin.topics().getList(namespace).isEmpty());

        // delete namespace
        deleteNamespaceWithRetry(namespace, false);
        assertFalse(admin.namespaces().getNamespaces(tenant).contains(namespace));
        assertTrue(admin.namespaces().getNamespaces(tenant).isEmpty());

        // delete tenant
        admin.tenants().deleteTenant(tenant);
        assertFalse(admin.tenants().getTenants().contains(tenant));

        final String managedLedgersPath = "/managed-ledgers/" + tenant;
        final String partitionedTopicPath = "/admin/partitioned-topics/" + tenant;
        final String localPoliciesPath = "/admin/local-policies/" + tenant;
        final String bundleDataPath = BUNDLE_DATA_BASE_PATH + "/" + tenant;
        assertFalse(pulsar.getLocalMetadataStore().exists(managedLedgersPath).join());
        assertFalse(pulsar.getLocalMetadataStore().exists(partitionedTopicPath).join());
        assertFalse(pulsar.getLocalMetadataStore().exists(localPoliciesPath).join());
        assertFalse(pulsar.getLocalMetadataStore().exists(bundleDataPath).join());
    }

    @Data
    @AllArgsConstructor
    private static class NamespaceAttr {
        private boolean systemTopicEnabled;
        private TopicType autoTopicCreationType;
        private int defaultNumPartitions;
        private boolean forceDeleteNamespaceAllowed;
    }

    @DataProvider(name = "namespaceAttrs")
    public Object[][] namespaceAttributes(){
        return new Object[][]{
                {new NamespaceAttr(false, TopicType.NON_PARTITIONED, 0, false)},
                {new NamespaceAttr(true, TopicType.NON_PARTITIONED, 0, false)},
                {new NamespaceAttr(true, TopicType.PARTITIONED, 3, false)}
        };
    }

    private NamespaceAttr markOriginalNamespaceAttr(){
        return new NamespaceAttr(conf.isSystemTopicEnabled(), conf.getAllowAutoTopicCreationType(),
                conf.getDefaultNumPartitions(), conf.isForceDeleteNamespaceAllowed());
    }

    private void setNamespaceAttr(NamespaceAttr namespaceAttr){
        conf.setSystemTopicEnabled(namespaceAttr.systemTopicEnabled);
        conf.setAllowAutoTopicCreationType(namespaceAttr.autoTopicCreationType);
        conf.setDefaultNumPartitions(namespaceAttr.defaultNumPartitions);
        conf.setForceDeleteNamespaceAllowed(namespaceAttr.forceDeleteNamespaceAllowed);
    }

    @Test(dataProvider = "namespaceAttrs")
    public void testDeleteNamespace(NamespaceAttr namespaceAttr) throws Exception {
        restartClusterAfterTest();

        // Set conf.
        cleanup();
        setNamespaceAttr(namespaceAttr);
        this.conf.setMetadataStoreUrl("127.0.0.1:2181");
        this.conf.setConfigurationMetadataStoreUrl("127.0.0.1:2182");
        setup();

        String tenant = newUniqueName("test-tenant");
        assertFalse(admin.tenants().getTenants().contains(tenant));

        // create tenant
        admin.tenants().createTenant(tenant,
                new TenantInfoImpl(Set.of("role1", "role2"), Set.of("test")));
        assertTrue(admin.tenants().getTenants().contains(tenant));

        // create namespace
        String namespace = tenant + "/test-ns";
        admin.namespaces().createNamespace(namespace, Set.of("test"));
        assertEquals(admin.namespaces().getNamespaces(tenant), List.of(namespace));

        // create topic
        String topic = namespace + "/test-topic";
        admin.topics().createPartitionedTopic(topic, 10);
        assertFalse(admin.topics().getList(namespace).isEmpty());

        final String managedLedgersPath = "/managed-ledgers/" + namespace;
        final String bundleDataPath = BUNDLE_DATA_BASE_PATH + "/" + namespace;
        // Trigger bundle owned by brokers.
        pulsarClient.newProducer().topic(topic).create().close();
        // Trigger bundle data write to ZK.
        Awaitility.await().untilAsserted(() -> {
            boolean bundleDataWereWriten = false;
            for (PulsarService ps : new PulsarService[]{pulsar, mockPulsarSetup.getPulsar()}) {
                ModularLoadManagerWrapper loadManager = (ModularLoadManagerWrapper) ps.getLoadManager().get();
                ModularLoadManagerImpl loadManagerImpl = (ModularLoadManagerImpl) loadManager.getLoadManager();
                ps.getBrokerService().updateRates();
                loadManagerImpl.updateLocalBrokerData();
                loadManagerImpl.writeBundleDataOnZooKeeper();
                bundleDataWereWriten = bundleDataWereWriten || ps.getLocalMetadataStore().exists(bundleDataPath).join();
            }
            assertTrue(bundleDataWereWriten);
        });

        // assert znode exists in metadata store
        assertTrue(pulsar.getLocalMetadataStore().exists(bundleDataPath).join());
        assertTrue(pulsar.getLocalMetadataStore().exists(managedLedgersPath).join());

        try {
            admin.namespaces().deleteNamespace(namespace, false);
            fail("should have failed due to namespace not empty");
        } catch (PulsarAdminException e) {
            // Expected: cannot delete non-empty tenant
        }

        // delete topic
        admin.topics().deletePartitionedTopic(topic);
        assertTrue(admin.topics().getList(namespace).isEmpty());

        // delete namespace
        deleteNamespaceWithRetry(namespace, false);
        assertFalse(admin.namespaces().getNamespaces(tenant).contains(namespace));
        assertTrue(admin.namespaces().getNamespaces(tenant).isEmpty());

        // assert znode deleted in metadata store
        assertFalse(pulsar.getLocalMetadataStore().exists(managedLedgersPath).join());
        assertFalse(pulsar.getLocalMetadataStore().exists(bundleDataPath).join());
    }

    @Test
    public void testDeleteNamespaceWithTopicPolicies() throws Exception {
        String tenant = newUniqueName("test-tenant");
        assertFalse(admin.tenants().getTenants().contains(tenant));

        // create tenant
        admin.tenants().createTenant(tenant,
                new TenantInfoImpl(Set.of("role1", "role2"), Set.of("test")));
        assertTrue(admin.tenants().getTenants().contains(tenant));

        // create namespace2
        String namespace = tenant + "/test-ns2";
        admin.namespaces().createNamespace(namespace, Set.of("test"));
        admin.topics().createNonPartitionedTopic(namespace + "/tobedeleted");
        // verify namespace can be deleted even without topic policy events
        admin.namespaces().deleteNamespace(namespace, true);

        Awaitility.await().untilAsserted(() -> {
            final CompletableFuture<Optional<Topic>> eventTopicFuture =
                    pulsar.getBrokerService().getTopics().get("persistent://test-tenant/test-ns2/__change_events");
            assertNull(eventTopicFuture);
        });
        admin.namespaces().createNamespace(namespace, Set.of("test"));
        // create topic
        String topic = namespace + "/test-topic2";
        Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create();
        producer.send("test".getBytes(StandardCharsets.UTF_8));
        BacklogQuota backlogQuota = BacklogQuotaImpl
                .builder()
                .limitTime(1000)
                .limitSize(1000)
                .retentionPolicy(BacklogQuota.RetentionPolicy.producer_exception)
                .build();
        admin.topicPolicies().setBacklogQuota(topic, backlogQuota);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(admin.topicPolicies()
                    .getBacklogQuotaMap(topic)
                    .get(BacklogQuota.BacklogQuotaType.destination_storage), backlogQuota);
        });
        producer.close();
        admin.topics().delete(topic);
        deleteNamespaceWithRetry(namespace, false);
        Awaitility.await().untilAsserted(() -> {
            assertTrue(admin.namespaces().getNamespaces(tenant).isEmpty());
        });
    }


    @Test(timeOut = 30000)
    public void testBacklogNoDelayed() throws PulsarClientException, PulsarAdminException, InterruptedException {
        final String topic = "persistent://" + defaultNamespace + "/precise-back-log-no-delayed-" + UUID.randomUUID().toString();
        final String subName = "sub-name";

        @Cleanup
        PulsarClient client = PulsarClient.builder().serviceUrl(pulsar.getWebServiceAddress()).build();

        @Cleanup
        Consumer<byte[]> consumer = client.newConsumer()
            .topic(topic)
            .subscriptionName(subName)
            .subscriptionType(SubscriptionType.Shared)
            .acknowledgmentGroupTime(0, TimeUnit.SECONDS)
            .subscribe();

        @Cleanup
        Producer<byte[]> producer = client.newProducer()
            .topic(topic)
            .enableBatching(false)
            .create();

        for (int i = 0; i < 10; i++) {
            if (i > 4) {
                producer.newMessage()
                    .value("message-1".getBytes(StandardCharsets.UTF_8))
                    .deliverAfter(10, TimeUnit.SECONDS)
                    .send();
            } else {
                producer.send("message-1".getBytes(StandardCharsets.UTF_8));
            }
        }

        // Wait for messages to be tracked for delayed delivery. This happens
        // on the consumer dispatch side, so when the send() is complete we're
        // not yet guaranteed to see the stats updated.
        Awaitility.await().untilAsserted(() -> {
            TopicStats topicStats = admin.topics().getStats(topic, true, true);
            assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklog(), 10);
            assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklogNoDelayed(), 5);
        });


        for (int i = 0; i < 5; i++) {
            consumer.acknowledge(consumer.receive());
        }

        // Wait the ack send.
        Awaitility.await().untilAsserted(() -> {
            TopicStats topicStats = admin.topics().getStats(topic, true, true);
            assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklog(), 5);
            assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklogNoDelayed(), 0);
        });

    }

    @Test
    public void testPreciseBacklogForPartitionedTopic() throws PulsarClientException, PulsarAdminException {
        final String topic = "persistent://" + defaultNamespace + "/precise-back-log-for-partitioned-topic";
        admin.topics().createPartitionedTopic(topic, 2);
        final String subName = "sub-name";

        @Cleanup
        PulsarClient client = PulsarClient.builder().serviceUrl(pulsar.getWebServiceAddress()).build();

        @Cleanup
        Consumer<byte[]> consumer = client.newConsumer()
            .topic(topic)
            .subscriptionName(subName)
            .acknowledgmentGroupTime(0, TimeUnit.SECONDS)
            .subscribe();

        @Cleanup
        Producer<byte[]> producer = client.newProducer()
            .topic(topic)
            .enableBatching(false)
            .create();

        producer.send("message-1".getBytes(StandardCharsets.UTF_8));
        Message<byte[]> message = consumer.receive();
        assertNotNull(message);

        // Mock the entries added count. Default is disable the precise backlog, so the backlog is entries added count - consumed count
        // Since message have not acked, so the backlog is 10
        for (int i = 0; i < 2; i++) {
            PersistentSubscription subscription = (PersistentSubscription)pulsar.getBrokerService().getTopicReference(topic + "-partition-" + i).get().getSubscription(subName);
            assertNotNull(subscription);
            ((ManagedLedgerImpl)subscription.getCursor().getManagedLedger()).setEntriesAddedCounter(10L);
        }

        TopicStats topicStats = admin.topics().getPartitionedStats(topic, false);
        assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklog(), 20);

        topicStats = admin.topics().getPartitionedStats(topic, false, true, true, true);
        assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklog(), 1);
        assertEquals(topicStats.getSubscriptions().get(subName).getBacklogSize(), 40);
    }

    @Test(timeOut = 30000)
    public void testBacklogNoDelayedForPartitionedTopic() throws PulsarClientException, PulsarAdminException, InterruptedException {
        final String topic = "persistent://" + defaultNamespace + "/precise-back-log-no-delayed-partitioned-topic";
        admin.topics().createPartitionedTopic(topic, 2);
        final String subName = "sub-name";

        @Cleanup
        PulsarClient client = PulsarClient.builder().serviceUrl(pulsar.getWebServiceAddress()).build();

        @Cleanup
        Consumer<byte[]> consumer = client.newConsumer()
            .topic(topic)
            .subscriptionName(subName)
            .subscriptionType(SubscriptionType.Shared)
            .acknowledgmentGroupTime(0, TimeUnit.SECONDS)
            .subscribe();

        long start1 = 0;
        long start2 = 0;
        @Cleanup
        Producer<byte[]> producer = client.newProducer()
            .topic(topic)
            .enableBatching(false)
            .create();

        for (int i = 0; i < 10; i++) {
            if (i == 0) {
                start1 = Clock.systemUTC().millis();
            }
            if (i == 5) {
                start2 = Clock.systemUTC().millis();
            }
            if (i > 4) {
                producer.newMessage()
                    .value("message-1".getBytes(StandardCharsets.UTF_8))
                    .deliverAfter(10, TimeUnit.SECONDS)
                    .send();
            } else {
                producer.send("message-1".getBytes(StandardCharsets.UTF_8));
            }
        }
        // wait until the message add to delay queue.
        long finalStart1 = start1;
        Awaitility.await().untilAsserted(() -> {
            TopicStats topicStats = admin.topics().getPartitionedStats(topic, false, true, true, true);
            assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklog(), 10);
            assertEquals(topicStats.getSubscriptions().get(subName).getBacklogSize(), 440);
            assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklogNoDelayed(), 5);
            assertTrue(topicStats.getSubscriptions().get(subName).getEarliestMsgPublishTimeInBacklog() >= finalStart1);
        });

        for (int i = 0; i < 5; i++) {
            consumer.acknowledge(consumer.receive());
        }
        // Wait the ack send.
        long finalStart2 = start2;
        Awaitility.await().timeout(1, MINUTES).untilAsserted(() -> {
            TopicStats topicStats2 = admin.topics().getPartitionedStats(topic, false, true, true, true);
            assertEquals(topicStats2.getSubscriptions().get(subName).getMsgBacklog(), 5);
            assertEquals(topicStats2.getSubscriptions().get(subName).getBacklogSize(), 223);
            assertEquals(topicStats2.getSubscriptions().get(subName).getMsgBacklogNoDelayed(), 0);
            assertTrue(topicStats2.getSubscriptions().get(subName).getEarliestMsgPublishTimeInBacklog() >= finalStart2);
        });

    }

    @Test
    public void testMaxNumPartitionsPerPartitionedTopicSuccess() {
        restartClusterAfterTest();
        final String topic = "persistent://" + defaultNamespace + "/max-num-partitions-per-partitioned-topic-success";
        pulsar.getConfiguration().setMaxNumPartitionsPerPartitionedTopic(3);

        try {
            admin.topics().createPartitionedTopic(topic, 2);
        } catch (Exception e) {
            fail("should not throw any exceptions");
        }

        // reset configuration
        pulsar.getConfiguration().setMaxNumPartitionsPerPartitionedTopic(0);
    }

    @Test
    public void testMaxNumPartitionsPerPartitionedTopicFailure() {
        restartClusterAfterTest();
        final String topic = "persistent://" + defaultNamespace + "/max-num-partitions-per-partitioned-topic-failure";
        pulsar.getConfiguration().setMaxNumPartitionsPerPartitionedTopic(2);

        try {
            admin.topics().createPartitionedTopic(topic, 3);
            fail("should throw exception when number of partitions exceed than max partitions");
        } catch (Exception e) {
            assertTrue(e instanceof PulsarAdminException);
        }

        // reset configuration
        pulsar.getConfiguration().setMaxNumPartitionsPerPartitionedTopic(0);
    }

    @Test
    public void testListOfNamespaceBundles() throws Exception {
        TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"), Set.of("test"));
        String tenantName = newUniqueName("prop-xyz2");
        admin.tenants().createTenant(tenantName, tenantInfo);
        admin.namespaces().createNamespace(tenantName + "/ns1", 10);
        admin.namespaces().setNamespaceReplicationClusters(tenantName + "/ns1", Set.of("test"));
        admin.namespaces().createNamespace(tenantName + "/test/ns2", 10);
        assertEquals(admin.namespaces().getBundles(tenantName + "/ns1").getNumBundles(), 10);
        assertEquals(admin.namespaces().getBundles(tenantName + "/test/ns2").getNumBundles(), 10);

        admin.namespaces().deleteNamespace(tenantName + "/test/ns2");
    }

    @Test
    public void testForceDeleteNamespace() throws Exception {
        restartClusterAfterTest();
        String tenantName = newUniqueName("prop-xyz2");
        final String namespaceName = tenantName + "/ns1";
        TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"), Set.of("test"));
        admin.tenants().createTenant(tenantName, tenantInfo);
        admin.namespaces().createNamespace(namespaceName, 1);
        final String topic = "persistent://" + namespaceName + "/test" + UUID.randomUUID();
        pulsarClient.newProducer(Schema.DOUBLE).topic(topic).create().close();
        Awaitility.await().untilAsserted(() -> assertNotNull(admin.schemas().getSchemaInfo(topic)));
        deleteNamespaceWithRetry(namespaceName, true);
        try {
            admin.schemas().getSchemaInfo(topic);
            Assert.fail("fail");
        } catch (PulsarAdminException e) {
            assertEquals(e.getStatusCode(), 404);
        }
    }

    @Test
    public void testForceDeleteNamespaceWithAutomaticTopicCreation() throws Exception {
        conf.setForceDeleteNamespaceAllowed(true);
        String tenantName = newUniqueName("prop-xyz2");
        final String namespaceName = tenantName + "/ns1";
        TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"), Set.of("test"));
        admin.tenants().createTenant(tenantName, tenantInfo);
        admin.namespaces().createNamespace(namespaceName, 1);
        admin.namespaces().setAutoTopicCreation(namespaceName,
                AutoTopicCreationOverride.builder()
                        .allowAutoTopicCreation(true)
                        .topicType("partitioned")
                        .defaultNumPartitions(20)
                        .build());
        final String topic = "persistent://" + namespaceName + "/test" + UUID.randomUUID();

        // start a consumer, that creates the topic
        try (Consumer<Double> consumer = pulsarClient.newConsumer(Schema.DOUBLE).topic(topic)
                .subscriptionName("test").autoUpdatePartitions(true).subscribe()) {

            // wait for the consumer to settle
            Awaitility.await().ignoreExceptions().untilAsserted(() ->
                    assertNotNull(admin.topics().getSubscriptions(topic).contains("test")));

            // verify that the partitioned topic is created
            assertEquals(20, admin.topics().getPartitionedTopicMetadata(topic).partitions);

            // the consumer will race with the deletion
            // the consumer will try to re-create the partitions
            admin.namespaces().deleteNamespace(namespaceName, true);

            assertFalse(admin.namespaces().getNamespaces(tenantName).contains("ns1"));
        }
    }

    @Test
    public void testUpdateClusterWithProxyUrl() throws Exception {
        ClusterData cluster = ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build();
        String clusterName = "test2";
        admin.clusters().createCluster(clusterName, cluster);
        Assert.assertEquals(admin.clusters().getCluster(clusterName), cluster);

        // update
        cluster = ClusterData.builder()
                .serviceUrl(pulsar.getWebServiceAddress())
                .proxyServiceUrl("pulsar://example.com")
                .proxyProtocol(ProxyProtocol.SNI)
                .build();
        admin.clusters().updateCluster(clusterName, cluster);
        Assert.assertEquals(admin.clusters().getCluster(clusterName), cluster);
    }

    @Test
    public void testMaxNamespacesPerTenant() throws Exception {
        restartClusterAfterTest();
        cleanup();
        conf.setMaxNamespacesPerTenant(2);
        setup();
        TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"), Set.of("test"));
        admin.tenants().createTenant("testTenant", tenantInfo);
        admin.namespaces().createNamespace("testTenant/ns1", Set.of("test"));
        admin.namespaces().createNamespace("testTenant/ns2", Set.of("test"));
        try {
            admin.namespaces().createNamespace("testTenant/ns3", Set.of("test"));
        } catch (PulsarAdminException e) {
            Assert.assertEquals(e.getStatusCode(), 412);
            Assert.assertEquals(e.getHttpError(), "Exceed the maximum number of namespace in tenant :testTenant");
        }
    }

    @Test
    public void testAutoTopicCreationOverrideWithMaxNumPartitionsLimit() throws Exception{
        restartClusterAfterTest();
        cleanup();
        conf.setMaxNumPartitionsPerPartitionedTopic(10);
        setup();
        TenantInfoImpl tenantInfo = new TenantInfoImpl(
                Set.of("role1", "role2"), Set.of("test"));
        admin.tenants().createTenant("testTenant", tenantInfo);
        // test non-partitioned
        admin.namespaces().createNamespace("testTenant/ns1", Set.of("test"));
        AutoTopicCreationOverride overridePolicy = AutoTopicCreationOverride
                .builder().allowAutoTopicCreation(true)
                .topicType("non-partitioned")
                .build();
        admin.namespaces().setAutoTopicCreation("testTenant/ns1", overridePolicy);
        AutoTopicCreationOverride newOverridePolicy =
                admin.namespaces().getAutoTopicCreation("testTenant/ns1");
        assertEquals(overridePolicy, newOverridePolicy);
        // test partitioned
        AutoTopicCreationOverride partitionedOverridePolicy = AutoTopicCreationOverride
                .builder().allowAutoTopicCreation(true)
                .topicType("partitioned")
                .defaultNumPartitions(10)
                .build();
        admin.namespaces().setAutoTopicCreation("testTenant/ns1", partitionedOverridePolicy);
        AutoTopicCreationOverride partitionedNewOverridePolicy =
                admin.namespaces().getAutoTopicCreation("testTenant/ns1");
        assertEquals(partitionedOverridePolicy, partitionedNewOverridePolicy);
        // test partitioned with error
        AutoTopicCreationOverride partitionedWrongOverridePolicy = AutoTopicCreationOverride
                .builder().allowAutoTopicCreation(true)
                .topicType("partitioned")
                .defaultNumPartitions(123)
                .build();
        try {
            admin.namespaces().setAutoTopicCreation("testTenant/ns1", partitionedWrongOverridePolicy);
            fail();
        } catch (Exception ex) {
            assertTrue(ex.getCause() instanceof NotAcceptableException);
        }
    }
    @Test
    public void testMaxTopicsPerNamespace() throws Exception {
        restartClusterAfterTest();
        cleanup();
        conf.setMaxTopicsPerNamespace(10);
        setup();
        TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"), Set.of("test"));
        admin.tenants().createTenant("testTenant", tenantInfo);
        admin.namespaces().createNamespace("testTenant/ns1", Set.of("test"));

        // check create partitioned/non-partitioned topics
        String topic = "persistent://testTenant/ns1/test_create_topic_v";
        admin.topics().createPartitionedTopic(topic + "1", 2);
        admin.topics().createPartitionedTopic(topic + "2", 3);
        admin.topics().createPartitionedTopic(topic + "3", 4);
        admin.topics().createNonPartitionedTopic(topic + "4");
        try {
            admin.topics().createPartitionedTopic(topic + "5", 2);
            Assert.fail();
        } catch (PulsarAdminException e) {
            Assert.assertEquals(e.getStatusCode(), 412);
            Assert.assertEquals(e.getHttpError(), "Exceed maximum number of topics in namespace.");
        }

        //unlimited
        cleanup();
        conf.setMaxTopicsPerNamespace(0);
        setup();
        admin.tenants().createTenant("testTenant", tenantInfo);
        admin.namespaces().createNamespace("testTenant/ns1", Set.of("test"));
        for (int i = 0; i < 10; ++i) {
            admin.topics().createPartitionedTopic(topic + i, 2);
            admin.topics().createNonPartitionedTopic(topic + i + i);
        }

        // check first create normal topic, then system topics, unlimited even setMaxTopicsPerNamespace
        cleanup();
        conf.setMaxTopicsPerNamespace(5);
        setup();
        admin.tenants().createTenant("testTenant", tenantInfo);
        admin.namespaces().createNamespace("testTenant/ns1", Set.of("test"));
        for (int i = 0; i < 5; ++i) {
            admin.topics().createPartitionedTopic(topic + i, 1);
        }
        admin.topics().createPartitionedTopic("persistent://testTenant/ns1/__change_events", 6);


        // check first create system topics, then normal topic, unlimited even setMaxTopicsPerNamespace
        cleanup();
        conf.setMaxTopicsPerNamespace(5);
        setup();
        admin.tenants().createTenant("testTenant", tenantInfo);
        admin.namespaces().createNamespace("testTenant/ns1", Set.of("test"));
        admin.topics().createPartitionedTopic("persistent://testTenant/ns1/__change_events", 6);
        for (int i = 0; i < 5; ++i) {
            admin.topics().createPartitionedTopic(topic + i, 1);
        }

        // check producer/consumer auto create partitioned topic
        cleanup();
        conf.setMaxTopicsPerNamespace(10);
        conf.setDefaultNumPartitions(3);
        conf.setAllowAutoTopicCreationType(TopicType.PARTITIONED);
        setup();
        admin.tenants().createTenant("testTenant", tenantInfo);
        admin.namespaces().createNamespace("testTenant/ns1", Set.of("test"));

        pulsarClient.newProducer().topic(topic + "1").create().close();
        pulsarClient.newProducer().topic(topic + "2").create().close();
        pulsarClient.newConsumer().topic(topic + "3").subscriptionName("test_sub").subscribe().close();
        try {
            pulsarClient.newConsumer().topic(topic + "4").subscriptionName("test_sub").subscribe().close();
            Assert.fail();
        } catch (PulsarClientException e) {
            log.info("Exception: ", e);
        }

        // check producer/consumer auto create non-partitioned topic
        cleanup();
        conf.setMaxTopicsPerNamespace(3);
        conf.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
        setup();
        admin.tenants().createTenant("testTenant", tenantInfo);
        admin.namespaces().createNamespace("testTenant/ns1", Set.of("test"));

        pulsarClient.newProducer().topic(topic + "1").create().close();
        pulsarClient.newProducer().topic(topic + "2").create().close();
        pulsarClient.newConsumer().topic(topic + "3").subscriptionName("test_sub").subscribe().close();
        try {
            pulsarClient.newConsumer().topic(topic + "4").subscriptionName("test_sub").subscribe().close();
            Assert.fail();
        } catch (PulsarClientException e) {
            log.info("Exception: ", e);
        }
    }

    @Test
    public void testInvalidBundleErrorResponse() throws Exception {
        try {
            admin.namespaces().deleteNamespaceBundle(defaultNamespace, "invalid-bundle");
            fail("should have failed due to invalid bundle");
        } catch (PreconditionFailedException e) {
            assertTrue(e.getMessage().startsWith("Invalid bundle range"));
        }
    }

    @Test
    public void testMaxSubscriptionsPerTopic() throws Exception {
        restartClusterAfterTest();
        cleanup();
        conf.setMaxSubscriptionsPerTopic(2);
        setup();

        TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"), Set.of("test"));
        admin.tenants().createTenant("testTenant", tenantInfo);
        admin.namespaces().createNamespace("testTenant/ns1", Set.of("test"));

        final String topic = "persistent://testTenant/ns1/max-subscriptions-per-topic";

        admin.topics().createPartitionedTopic(topic, 3);
        Producer producer = pulsarClient.newProducer().topic(topic).create();
        producer.close();

        // create subscription
        admin.topics().createSubscription(topic, "test-sub1", MessageId.earliest);
        admin.topics().createSubscription(topic, "test-sub2", MessageId.earliest);
        try {
            admin.topics().createSubscription(topic, "test-sub3", MessageId.earliest);
            Assert.fail();
        } catch (PulsarAdminException e) {
            log.info("create subscription failed. Exception: ", e);
        }

        cleanup();
        conf.setMaxSubscriptionsPerTopic(0);
        setup();

        admin.tenants().createTenant("testTenant", tenantInfo);
        admin.namespaces().createNamespace("testTenant/ns1", Set.of("test"));

        admin.topics().createPartitionedTopic(topic, 3);
        producer = pulsarClient.newProducer().topic(topic).create();
        producer.close();

        for (int i = 0; i < 10; ++i) {
            admin.topics().createSubscription(topic, "test-sub" + i, MessageId.earliest);
        }

        cleanup();
        conf.setMaxSubscriptionsPerTopic(2);
        setup();

        admin.tenants().createTenant("testTenant", tenantInfo);
        admin.namespaces().createNamespace("testTenant/ns1", Set.of("test"));

        admin.topics().createPartitionedTopic(topic, 3);
        producer = pulsarClient.newProducer().topic(topic).create();
        producer.close();

        Consumer consumer1 = null;
        Consumer consumer2 = null;
        Consumer consumer3 = null;

        try {
            consumer1 = pulsarClient.newConsumer().subscriptionName("test-sub1").topic(topic).subscribe();
            Assert.assertNotNull(consumer1);
        } catch (PulsarClientException e) {
            Assert.fail();
        }

        try {
            consumer2 = pulsarClient.newConsumer().subscriptionName("test-sub2").topic(topic).subscribe();
            Assert.assertNotNull(consumer2);
        } catch (PulsarClientException e) {
            Assert.fail();
        }

        try {
            consumer3 = pulsarClient.newConsumer().subscriptionName("test-sub3").topic(topic).subscribe();
            Assert.fail();
        } catch (PulsarClientException e) {
            log.info("subscription reached max subscriptions per topic");
        }

        consumer1.close();
        consumer2.close();
        admin.topics().deletePartitionedTopic(topic);
    }

    @Test(timeOut = 30000)
    public void testMaxSubPerTopicApi() throws Exception {
        final String myNamespace = newUniqueName(defaultTenant + "/ns");
        admin.namespaces().createNamespace(myNamespace, Set.of("test"));

        assertNull(admin.namespaces().getMaxSubscriptionsPerTopic(myNamespace));

        admin.namespaces().setMaxSubscriptionsPerTopic(myNamespace,100);
        assertEquals(admin.namespaces().getMaxSubscriptionsPerTopic(myNamespace).intValue(),100);
        admin.namespaces().removeMaxSubscriptionsPerTopic(myNamespace);
        assertNull(admin.namespaces().getMaxSubscriptionsPerTopic(myNamespace));

        admin.namespaces().setMaxSubscriptionsPerTopicAsync(myNamespace,200).get();
        assertEquals(admin.namespaces().getMaxSubscriptionsPerTopicAsync(myNamespace).get().intValue(),200);
        admin.namespaces().removeMaxSubscriptionsPerTopicAsync(myNamespace);
        Awaitility.await().untilAsserted(()
                -> assertNull(admin.namespaces().getMaxSubscriptionsPerTopicAsync(myNamespace).get()));

        try {
            admin.namespaces().setMaxSubscriptionsPerTopic(myNamespace,-100);
            fail("should fail");
        } catch (PulsarAdminException ignore) {
        }
    }

    @Test(timeOut = 60000)
    public void testSetNamespaceEntryFilters() throws Exception {
        restartClusterAfterTest();
        restartClusterIfReused();
        @Cleanup
        final MockEntryFilterProvider testEntryFilterProvider =
                new MockEntryFilterProvider(conf);

        testEntryFilterProvider
                .setMockEntryFilters(new EntryFilterDefinition(
                        "test",
                        null,
                        EntryFilterTest.class.getName()
                ));
        final EntryFilterProvider oldEntryFilterProvider = pulsar.getBrokerService().getEntryFilterProvider();
        FieldUtils.writeField(pulsar.getBrokerService(),
                "entryFilterProvider", testEntryFilterProvider, true);

        try {
            EntryFilters entryFilters = new EntryFilters("test");

            final String myNamespace = newUniqueName(defaultTenant + "/ns");
            admin.namespaces().createNamespace(myNamespace, Sets.newHashSet("test"));
            final String topicName = myNamespace + "/topic";
            admin.topics().createNonPartitionedTopic(topicName);

            assertNull(admin.namespaces().getNamespaceEntryFilters(myNamespace));
            assertEquals(pulsar
                    .getBrokerService()
                    .getTopic(topicName, false)
                    .get()
                    .get()
                    .getEntryFilters()
                    .size(), 0);

            admin.namespaces().setNamespaceEntryFilters(myNamespace, entryFilters);
            assertEquals(admin.namespaces().getNamespaceEntryFilters(myNamespace), entryFilters);
            Awaitility.await().untilAsserted(() -> {
                assertEquals(pulsar
                        .getBrokerService()
                        .getTopic(topicName, false)
                        .get()
                        .get()
                        .getEntryFiltersPolicy()
                        .getEntryFilterNames(), "test");
            });

            assertEquals(pulsar
                    .getBrokerService()
                    .getTopic(topicName, false)
                    .get()
                    .get()
                    .getEntryFilters()
                    .size(), 1);
            admin.namespaces().removeNamespaceEntryFilters(myNamespace);
            assertNull(admin.namespaces().getNamespaceEntryFilters(myNamespace));
            Awaitility.await().untilAsserted(() -> {
                assertEquals(pulsar
                        .getBrokerService()
                        .getTopic(topicName, false)
                        .get()
                        .get()
                        .getEntryFiltersPolicy()
                        .getEntryFilterNames(), "");
            });

            assertEquals(pulsar
                    .getBrokerService()
                    .getTopic(topicName, false)
                    .get()
                    .get()
                    .getEntryFilters()
                    .size(), 0);
        } finally {
            FieldUtils.writeField(pulsar.getBrokerService(),
                    "entryFilterProvider", oldEntryFilterProvider, true);
        }
    }

    @Test(dataProvider = "topicType")
    public void testSetTopicLevelEntryFilters(String topicType) throws Exception {
        restartClusterAfterTest();
        restartClusterIfReused();
        @Cleanup
        final MockEntryFilterProvider testEntryFilterProvider =
                new MockEntryFilterProvider(conf);

        testEntryFilterProvider
                .setMockEntryFilters(new EntryFilterDefinition(
                        "test",
                        null,
                        EntryFilterTest.class.getName()
                ));
        final EntryFilterProvider oldEntryFilterProvider = pulsar.getBrokerService().getEntryFilterProvider();
        FieldUtils.writeField(pulsar.getBrokerService(),
                "entryFilterProvider", testEntryFilterProvider, true);
        try {
            EntryFilters entryFilters = new EntryFilters("test");
            final String topic = topicType + "://" + defaultNamespace + "/test-schema-validation-enforced";
            admin.topics().createPartitionedTopic(topic, 1);
            final String fullTopicName = topic + TopicName.PARTITIONED_TOPIC_SUFFIX + 0;
            @Cleanup
            Producer<byte[]> producer1 = pulsarClient.newProducer()
                    .topic(fullTopicName)
                    .create();
            assertNull(admin.topicPolicies().getEntryFiltersPerTopic(topic, false));
            assertEquals(pulsar
                    .getBrokerService()
                    .getTopic(fullTopicName, false)
                    .get()
                    .get()
                    .getEntryFilters()
                    .size(), 0);
            admin.topicPolicies().setEntryFiltersPerTopic(topic, entryFilters);
            Awaitility.await().untilAsserted(() -> assertEquals(admin.topicPolicies().getEntryFiltersPerTopic(topic,
                    false), entryFilters));
            Awaitility.await().untilAsserted(() -> {
                assertEquals(pulsar
                        .getBrokerService()
                        .getTopic(fullTopicName, false)
                        .get()
                        .get()
                        .getEntryFiltersPolicy()
                        .getEntryFilterNames(), "test");
            });
            assertEquals(pulsar
                    .getBrokerService()
                    .getTopic(fullTopicName, false)
                    .get()
                    .get()
                    .getEntryFilters()
                    .size(), 1);
            admin.topicPolicies().removeEntryFiltersPerTopic(topic);
            assertNull(admin.topicPolicies().getEntryFiltersPerTopic(topic, false));
            Awaitility.await().untilAsserted(() -> {
                assertEquals(pulsar
                        .getBrokerService()
                        .getTopic(fullTopicName, false)
                        .get()
                        .get()
                        .getEntryFiltersPolicy()
                        .getEntryFilterNames(), "");
            });
            assertEquals(pulsar
                    .getBrokerService()
                    .getTopic(fullTopicName, false)
                    .get()
                    .get()
                    .getEntryFilters()
                    .size(), 0);
        } finally {
            FieldUtils.writeField(pulsar.getBrokerService(),
                    "entryFilterProvider", oldEntryFilterProvider, true);
        }
    }

    @Test(timeOut = 60000)
    public void testSetEntryFiltersHierarchy() throws Exception {
        restartClusterAfterTest();
        restartClusterIfReused();
        @Cleanup
        final MockEntryFilterProvider testEntryFilterProvider =
                new MockEntryFilterProvider(conf);
        testEntryFilterProvider.setMockEntryFilters(new EntryFilterDefinition(
                        "test",
                        null,
                        EntryFilterTest.class.getName()
                ), new EntryFilterDefinition(
                        "test1",
                        null,
                        EntryFilter2Test.class.getName()
                ));
        final EntryFilterProvider oldEntryFilterProvider = pulsar.getBrokerService().getEntryFilterProvider();
        FieldUtils.writeField(pulsar.getBrokerService(),
                "entryFilterProvider", testEntryFilterProvider, true);
        conf.setEntryFilterNames(List.of("test", "test1"));
        conf.setAllowOverrideEntryFilters(true);
        try {

            final String topic = "persistent://" + defaultNamespace + "/test-schema-validation-enforced";
            admin.topics().createPartitionedTopic(topic, 1);
            final String fullTopicName = topic + TopicName.PARTITIONED_TOPIC_SUFFIX + 0;
            @Cleanup
            Producer<byte[]> producer1 = pulsarClient.newProducer()
                    .topic(fullTopicName)
                    .create();
            assertNull(admin.topicPolicies().getEntryFiltersPerTopic(topic, false));
            Awaitility.await().untilAsserted(() ->
                    assertEquals(admin.topicPolicies().getEntryFiltersPerTopic(topic, true),
                            new EntryFilters("test,test1")));
            assertEquals(pulsar
                    .getBrokerService()
                    .getTopic(fullTopicName, false)
                    .get()
                    .get()
                    .getEntryFilters()
                    .size(), 2);

            EntryFilters nsEntryFilters = new EntryFilters("test");
            admin.namespaces().setNamespaceEntryFilters(defaultNamespace, nsEntryFilters);
            assertEquals(admin.namespaces().getNamespaceEntryFilters(defaultNamespace), nsEntryFilters);
            Awaitility.await().untilAsserted(() ->
                    assertEquals(admin.topicPolicies().getEntryFiltersPerTopic(topic, true),
                            new EntryFilters("test")));
            Awaitility.await().untilAsserted(() -> {
                assertEquals(pulsar
                        .getBrokerService()
                        .getTopic(fullTopicName, false)
                        .get()
                        .get()
                        .getEntryFiltersPolicy()
                        .getEntryFilterNames(), "test");
            });

            Awaitility.await().untilAsserted(() -> {
                final List<EntryFilter> entryFilters = pulsar
                        .getBrokerService()
                        .getTopic(fullTopicName, false)
                        .get()
                        .get()
                        .getEntryFilters();
                assertEquals(entryFilters.size(), 1);
                assertEquals(((EntryFilterWithClassLoader)entryFilters.get(0))
                        .getEntryFilter().getClass(), EntryFilterTest.class);

            });


            EntryFilters topicEntryFilters = new EntryFilters("test1");
            admin.topicPolicies().setEntryFiltersPerTopic(topic, topicEntryFilters);
            Awaitility.await().untilAsserted(() -> assertEquals(admin.topicPolicies().getEntryFiltersPerTopic(topic,
                    false), topicEntryFilters));
            Awaitility.await().untilAsserted(() ->
                    assertEquals(admin.topicPolicies().getEntryFiltersPerTopic(topic, true),
                            new EntryFilters("test1")));
            Awaitility.await().untilAsserted(() -> {
                assertEquals(pulsar
                        .getBrokerService()
                        .getTopic(fullTopicName, false)
                        .get()
                        .get()
                        .getEntryFiltersPolicy()
                        .getEntryFilterNames(), "test1");
            });
            final List<EntryFilter> entryFilters = pulsar
                    .getBrokerService()
                    .getTopic(fullTopicName, false)
                    .get()
                    .get()
                    .getEntryFilters();
            assertEquals(entryFilters.size(), 1);
            assertEquals(((EntryFilterWithClassLoader) entryFilters.get(0))
                    .getEntryFilter().getClass(), EntryFilter2Test.class);

        } finally {
            FieldUtils.writeField(pulsar.getBrokerService(),
                    "entryFilterProvider", oldEntryFilterProvider, true);
        }
    }

    @Test(timeOut = 60000)
    public void testValidateNamespaceEntryFilters() throws Exception {
        restartClusterAfterTest();
        restartClusterIfReused();
        @Cleanup
        final MockEntryFilterProvider testEntryFilterProvider =
                new MockEntryFilterProvider(conf);

        testEntryFilterProvider
                .setMockEntryFilters(new EntryFilterDefinition(
                        "test",
                        null,
                        EntryFilterTest.class.getName()
                ));
        final EntryFilterProvider oldEntryFilterProvider = pulsar.getBrokerService().getEntryFilterProvider();
        FieldUtils.writeField(pulsar.getBrokerService(),
                "entryFilterProvider", testEntryFilterProvider, true);

        try {
            final String myNamespace = newUniqueName(defaultTenant + "/ns");
            admin.namespaces().createNamespace(myNamespace, Sets.newHashSet("test"));
            try {
                admin.namespaces().setNamespaceEntryFilters(myNamespace, new EntryFilters("notexists"));
                fail();
            } catch (PulsarAdminException e) {
                assertEquals(e.getStatusCode(), 400);
                assertEquals(e.getMessage(), "Entry filter 'notexists' not found");
            }
            try {
                admin.namespaces().setNamespaceEntryFilters(myNamespace, new EntryFilters(""));
                fail();
            } catch (PulsarAdminException e) {
                assertEquals(e.getStatusCode(), 400);
                assertEquals(e.getMessage(), "entryFilterNames can't be empty. " +
                        "To remove entry filters use the remove method.");
            }
            try {
                admin.namespaces().setNamespaceEntryFilters(myNamespace, new EntryFilters(","));
                fail();
            } catch (PulsarAdminException e) {
                assertEquals(e.getStatusCode(), 400);
                assertEquals(e.getMessage(), "entryFilterNames can't be empty. " +
                        "To remove entry filters use the remove method.");
            }
            try {
                admin.namespaces().setNamespaceEntryFilters(myNamespace, new EntryFilters("test,notexists"));
                fail();
            } catch (PulsarAdminException e) {
                assertEquals(e.getStatusCode(), 400);
                assertEquals(e.getMessage(), "Entry filter 'notexists' not found");
            }
            assertNull(admin.namespaces().getNamespaceEntryFilters(myNamespace));
        } finally {
            FieldUtils.writeField(pulsar.getBrokerService(),
                    "entryFilterProvider", oldEntryFilterProvider, true);
        }
    }

    @Test(timeOut = 60000)
    public void testValidateTopicEntryFilters() throws Exception {
        restartClusterAfterTest();
        restartClusterIfReused();
        @Cleanup
        final MockEntryFilterProvider testEntryFilterProvider =
                new MockEntryFilterProvider(conf);

        testEntryFilterProvider
                .setMockEntryFilters(new EntryFilterDefinition(
                        "test",
                        null,
                        EntryFilterTest.class.getName()
                ));
        final EntryFilterProvider oldEntryFilterProvider = pulsar.getBrokerService().getEntryFilterProvider();
        FieldUtils.writeField(pulsar.getBrokerService(),
                "entryFilterProvider", testEntryFilterProvider, true);

        try {
            final String myNamespace = newUniqueName(defaultTenant + "/ns");
            admin.namespaces().createNamespace(myNamespace, Sets.newHashSet("test"));
            final String topicName = myNamespace + "/topic";
            admin.topics().createNonPartitionedTopic(topicName);
            @Cleanup
            Producer<byte[]> producer1 = pulsarClient.newProducer()
                    .topic(topicName)
                    .create();
            try {
                admin.topicPolicies().setEntryFiltersPerTopic(topicName, new EntryFilters("notexists"));
                fail();
            } catch (PulsarAdminException e) {
                assertEquals(e.getStatusCode(), 400);
                assertEquals(e.getMessage(), "Entry filter 'notexists' not found");
            }
            try {
                admin.topicPolicies().setEntryFiltersPerTopic(topicName, new EntryFilters(""));
                fail();
            } catch (PulsarAdminException e) {
                assertEquals(e.getStatusCode(), 400);
                assertEquals(e.getMessage(), "entryFilterNames can't be empty. " +
                        "To remove entry filters use the remove method.");
            }
            try {
                admin.topicPolicies().setEntryFiltersPerTopic(topicName, new EntryFilters(","));
                fail();
            } catch (PulsarAdminException e) {
                assertEquals(e.getStatusCode(), 400);
                assertEquals(e.getMessage(), "entryFilterNames can't be empty. " +
                        "To remove entry filters use the remove method.");
            }
            try {
                admin.topicPolicies().setEntryFiltersPerTopic(topicName, new EntryFilters("test,notexists"));
                fail();
            } catch (PulsarAdminException e) {
                assertEquals(e.getStatusCode(), 400);
                assertEquals(e.getMessage(), "Entry filter 'notexists' not found");
            }
            assertNull(admin.topicPolicies().getEntryFiltersPerTopic(topicName, false));
        } finally {
            FieldUtils.writeField(pulsar.getBrokerService(),
                    "entryFilterProvider", oldEntryFilterProvider, true);
        }
    }

    @Test(timeOut = 30000)
    public void testMaxSubPerTopic() throws Exception {
        restartClusterAfterTest();
        pulsar.getConfiguration().setMaxSubscriptionsPerTopic(0);
        final String myNamespace = newUniqueName(defaultTenant + "/ns");
        admin.namespaces().createNamespace(myNamespace, Set.of("test"));
        final String topic = "persistent://" + myNamespace + "/testMaxSubPerTopic";
        pulsarClient.newProducer().topic(topic).create().close();
        final int maxSub = 2;
        admin.namespaces().setMaxSubscriptionsPerTopic(myNamespace, maxSub);
        PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topic).get().get();
        Awaitility.await().until(() ->
                persistentTopic.getHierarchyTopicPolicies().getMaxSubscriptionsPerTopic().get() == maxSub);

        List<Consumer<?>> consumerList = new ArrayList<>(maxSub);
        for (int i = 0; i < maxSub; i++) {
            Consumer<?> consumer =
                    pulsarClient.newConsumer().topic(topic).subscriptionName(UUID.randomUUID().toString()).subscribe();
            consumerList.add(consumer);
        }
        //Create a client that can fail quickly
        try (PulsarClient client = PulsarClient.builder().operationTimeout(2,TimeUnit.SECONDS)
                .serviceUrl(brokerUrl.toString()).build()){
            @Cleanup
            Consumer<byte[]> subscribe =
                    client.newConsumer().topic(topic).subscriptionName(UUID.randomUUID().toString()).subscribe();
            fail("should fail");
        } catch (Exception ignore) {
        }
        //After removing the restriction, it should be able to create normally
        admin.namespaces().removeMaxSubscriptionsPerTopic(myNamespace);
        Awaitility.await().until(() ->
                persistentTopic.getHierarchyTopicPolicies().getMaxSubscriptionsPerTopic().get() == 0);
        Consumer<?> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName(UUID.randomUUID().toString())
                .subscribe();
        consumerList.add(consumer);

        for (Consumer<?> c : consumerList) {
            c.close();
        }
    }

    @Test(timeOut = 30000)
    public void testMaxSubPerTopicPriority() throws Exception {
        restartClusterAfterTest();
        final int brokerLevelMaxSub = 2;
        cleanup();
        conf.setMaxSubscriptionsPerTopic(brokerLevelMaxSub);
        setup();

        final String myNamespace = newUniqueName(defaultTenant + "/ns");
        admin.namespaces().createNamespace(myNamespace, Set.of("test"));
        final String topic = "persistent://" + myNamespace + "/testMaxSubPerTopic";
        //Create a client that can fail quickly
        @Cleanup
        PulsarClient client = PulsarClient.builder().operationTimeout(2,TimeUnit.SECONDS)
                .serviceUrl(brokerUrl.toString()).build();
        //We can only create 2 consumers
        List<Consumer<?>> consumerList = new ArrayList<>(brokerLevelMaxSub);
        for (int i = 0; i < brokerLevelMaxSub; i++) {
            Consumer<?> consumer =
                    pulsarClient.newConsumer().topic(topic).subscriptionName(UUID.randomUUID().toString()).subscribe();
            consumerList.add(consumer);
        }
        try {
            @Cleanup
            Consumer<byte[]> subscribe =
                    client.newConsumer().topic(topic).subscriptionName(UUID.randomUUID().toString()).subscribe();
            fail("should fail");
        } catch (Exception ignore) {

        }
        //Set namespace-level policy,the limit should up to 4
        final int nsLevelMaxSub = 4;
        admin.namespaces().setMaxSubscriptionsPerTopic(myNamespace, nsLevelMaxSub);
        PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topic).get().get();
        Awaitility.await().until(() -> persistentTopic.getHierarchyTopicPolicies()
                .getMaxSubscriptionsPerTopic().get() == nsLevelMaxSub);
        Consumer<?> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName(UUID.randomUUID().toString())
                .subscribe();
        consumerList.add(consumer);
        assertEquals(consumerList.size(), 3);
        //After removing the restriction, it should fail again
        admin.namespaces().removeMaxSubscriptionsPerTopic(myNamespace);
        Awaitility.await().until(() -> persistentTopic.getHierarchyTopicPolicies()
                .getMaxSubscriptionsPerTopic().get() == brokerLevelMaxSub);
        try {
            @Cleanup
            Consumer<byte[]> subscribe =
                    client.newConsumer().topic(topic).subscriptionName(UUID.randomUUID().toString()).subscribe();
            fail("should fail");
        } catch (Exception ignore) {

        }

        for (Consumer<?> c : consumerList) {
            c.close();
        }
    }

    @Test
    public void testMaxProducersPerTopicUnlimited() throws Exception {
        restartClusterAfterTest();
        final int maxProducersPerTopic = 1;
        cleanup();
        conf.setMaxProducersPerTopic(maxProducersPerTopic);
        setup();
        //init namespace
        final String myNamespace = newUniqueName(defaultTenant + "/ns");
        admin.namespaces().createNamespace(myNamespace, Set.of("test"));
        final String topic = "persistent://" + myNamespace + "/testMaxProducersPerTopicUnlimited";
        //the policy is set to 0, so there will be no restrictions
        admin.namespaces().setMaxProducersPerTopic(myNamespace, 0);
        Awaitility.await().until(()
                -> admin.namespaces().getMaxProducersPerTopic(myNamespace) == 0);
        List<Producer<byte[]>> producers = new ArrayList<>();
        for (int i = 0; i < maxProducersPerTopic + 1; i++) {
            Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create();
            producers.add(producer);
        }

        admin.namespaces().removeMaxProducersPerTopic(myNamespace);
        Awaitility.await().until(()
                -> admin.namespaces().getMaxProducersPerTopic(myNamespace) == null);
        try {
            @Cleanup
            Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create();
            fail("should fail");
        } catch (PulsarClientException e) {
            String expectMsg = "Topic '" + topic + "' reached max producers limit";
            assertTrue(e.getMessage().contains(expectMsg));
        }
        //set the limit to 3
        admin.namespaces().setMaxProducersPerTopic(myNamespace, 3);
        Awaitility.await().until(()
                -> admin.namespaces().getMaxProducersPerTopic(myNamespace) == 3);
        // should success
        Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create();
        producers.add(producer);
        try {
            @Cleanup
            Producer<byte[]> producer1 = pulsarClient.newProducer().topic(topic).create();
            fail("should fail");
        } catch (PulsarClientException e) {
            String expectMsg = "Topic '" + topic + "' reached max producers limit";
            assertTrue(e.getMessage().contains(expectMsg));
        }

        //clean up
        for (Producer<byte[]> tempProducer : producers) {
            tempProducer.close();
        }
    }

    @Test
    public void testMaxConsumersPerTopicUnlimited() throws Exception {
        restartClusterAfterTest();
        final int maxConsumersPerTopic = 1;
        cleanup();
        conf.setMaxConsumersPerTopic(maxConsumersPerTopic);
        setup();
        //init namespace
        final String myNamespace = newUniqueName(defaultTenant + "/ns");
        admin.namespaces().createNamespace(myNamespace, Set.of("test"));
        final String topic = "persistent://" + myNamespace + "/testMaxConsumersPerTopicUnlimited";

        assertNull(admin.namespaces().getMaxConsumersPerTopic(myNamespace));
        //the policy is set to 0, so there will be no restrictions
        admin.namespaces().setMaxConsumersPerTopic(myNamespace, 0);
        Awaitility.await().until(()
                -> admin.namespaces().getMaxConsumersPerTopic(myNamespace) == 0);
        List<Consumer<byte[]>> consumers = new ArrayList<>();
        for (int i = 0; i < maxConsumersPerTopic + 1; i++) {
            Consumer<byte[]> consumer =
                    pulsarClient.newConsumer().subscriptionName(UUID.randomUUID().toString()).topic(topic).subscribe();
            consumers.add(consumer);
        }

        admin.namespaces().removeMaxConsumersPerTopic(myNamespace);
        Awaitility.await().until(()
                -> admin.namespaces().getMaxConsumersPerTopic(myNamespace) == null);
        try {
            @Cleanup
            Consumer<byte[]> subscribe =
                    pulsarClient.newConsumer().subscriptionName(UUID.randomUUID().toString()).topic(topic).subscribe();
            fail("should fail");
        } catch (PulsarClientException e) {
            assertTrue(e.getMessage().contains("Topic reached max consumers limit"));
        }
        //set the limit to 3
        admin.namespaces().setMaxConsumersPerTopic(myNamespace, 3);
        Awaitility.await().until(()
                -> admin.namespaces().getMaxConsumersPerTopic(myNamespace) == 3);
        // should success
        Consumer<byte[]> consumer =
                pulsarClient.newConsumer().subscriptionName(UUID.randomUUID().toString()).topic(topic).subscribe();
        consumers.add(consumer);
        try {
            @Cleanup
            Consumer<byte[]> subscribe =
                    pulsarClient.newConsumer().subscriptionName(UUID.randomUUID().toString()).topic(topic).subscribe();
            fail("should fail");
        } catch (PulsarClientException e) {
            assertTrue(e.getMessage().contains("Topic reached max consumers limit"));
        }

        //clean up
        for (Consumer<byte[]> subConsumer : consumers) {
            subConsumer.close();
        }
    }

    @Test
    public void testClearBacklogForTheSubscriptionThatNoConsumers() throws Exception {
        final String topic = "persistent://" + defaultNamespace + "/clear_backlog_no_consumers" + UUID.randomUUID().toString();
        final String sub = "my-sub";
        admin.topics().createNonPartitionedTopic(topic);
        admin.topics().createSubscription(topic, sub, MessageId.earliest);
        admin.topics().skipAllMessages(topic, sub);
    }

    @Test(timeOut = 200000)
    public void testCompactionApi() throws Exception {
        final String namespace = newUniqueName(defaultTenant + "/ns");
        admin.namespaces().createNamespace(namespace, Set.of("test"));


        assertNull(admin.namespaces().getCompactionThreshold(namespace));
        assertEquals(pulsar.getConfiguration().getBrokerServiceCompactionThresholdInBytes(), 0);

        admin.namespaces().setCompactionThreshold(namespace, 10);
        Awaitility.await().untilAsserted(() ->
                assertNotNull(admin.namespaces().getCompactionThreshold(namespace)));
        assertEquals(admin.namespaces().getCompactionThreshold(namespace).intValue(), 10);

        admin.namespaces().removeCompactionThreshold(namespace);
        Awaitility.await().untilAsserted(() -> assertNull(admin.namespaces().getCompactionThreshold(namespace)));
    }

    @Test(timeOut = 200000)
    public void testCompactionPriority() throws Exception {
        restartClusterAfterTest();
        cleanup();
        conf.setBrokerServiceCompactionMonitorIntervalInSeconds(10000);
        setup();
        final String namespace = newUniqueName(defaultTenant + "/ns");
        admin.namespaces().createNamespace(namespace, Set.of("test"));
        final String topic = "persistent://" + namespace + "/topic" + UUID.randomUUID();
        pulsarClient.newProducer().topic(topic).create().close();
        TopicName topicName = TopicName.get(topic);
        PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topic).get().get();
        PersistentTopic mockTopic = spy(persistentTopic);
        mockTopic.checkCompaction();
        // Disabled by default
        verify(mockTopic, times(0)).triggerCompaction();
        // Set namespace-level policy
        admin.namespaces().setCompactionThreshold(namespace, 1);
        Awaitility.await().untilAsserted(() ->
                assertNotNull(admin.namespaces().getCompactionThreshold(namespace)));
        ManagedLedger managedLedger = persistentTopic.getManagedLedger();
        Field field = managedLedger.getClass().getDeclaredField("totalSize");
        field.setAccessible(true);
        field.setLong(managedLedger, 1000L);

        mockTopic.checkCompaction();
        verify(mockTopic, times(1)).triggerCompaction();
        //Set topic-level policy
        admin.topics().setCompactionThreshold(topic, 0);
        Awaitility.await().untilAsserted(() -> assertNotNull(admin.topics().getCompactionThreshold(topic)));
        mockTopic.checkCompaction();
        verify(mockTopic, times(1)).triggerCompaction();
        // Remove topic-level policy
        admin.topics().removeCompactionThreshold(topic);
        Awaitility.await().untilAsserted(() -> assertNull(admin.topics().getCompactionThreshold(topic)));
        mockTopic.checkCompaction();
        verify(mockTopic, times(2)).triggerCompaction();
        // Remove namespace-level policy
        admin.namespaces().removeCompactionThreshold(namespace);
        Awaitility.await().untilAsserted(() ->
                assertNull(admin.namespaces().getCompactionThreshold(namespace)));
        mockTopic.checkCompaction();
        verify(mockTopic, times(2)).triggerCompaction();
    }

    @Test
    public void testProperties() throws Exception {
        final String namespace = newUniqueName(defaultTenant + "/ns");
        admin.namespaces().createNamespace(namespace, Set.of("test"));
        admin.namespaces().setProperty(namespace, "a", "a");
        assertEquals("a", admin.namespaces().getProperty(namespace, "a"));
        assertNull(admin.namespaces().getProperty(namespace, "b"));
        admin.namespaces().setProperty(namespace, "b", "b");
        assertEquals("b", admin.namespaces().getProperty(namespace, "b"));
        admin.namespaces().setProperty(namespace, "a", "a1");
        assertEquals("a1", admin.namespaces().getProperty(namespace, "a"));
        assertEquals("b", admin.namespaces().removeProperty(namespace, "b"));
        assertNull(admin.namespaces().getProperty(namespace, "b"));
        admin.namespaces().clearProperties(namespace);
        assertEquals(admin.namespaces().getProperties(namespace).size(), 0);
        Map<String, String> properties = new HashMap<>();
        properties.put("aaa", "aaa");
        properties.put("bbb", "bbb");
        admin.namespaces().setProperties(namespace, properties);
        assertEquals(admin.namespaces().getProperties(namespace), properties);
        admin.namespaces().clearProperties(namespace);
        assertEquals(admin.namespaces().getProperties(namespace).size(), 0);
    }

    @Test
    public void testGetListInBundle() throws Exception {
        final String namespace = defaultTenant + "/ns11";
        admin.namespaces().createNamespace(namespace, 3);

        final String persistentTopicName = TopicName.get(
                "persistent", NamespaceName.get(namespace),
                "get_topics_mode_" + UUID.randomUUID()).toString();

        final String nonPersistentTopicName = TopicName.get(
                "non-persistent", NamespaceName.get(namespace),
                "get_topics_mode_" + UUID.randomUUID()).toString();
        admin.topics().createPartitionedTopic(persistentTopicName, 3);
        admin.topics().createPartitionedTopic(nonPersistentTopicName, 3);
        pulsarClient.newProducer().topic(persistentTopicName).create().close();
        pulsarClient.newProducer().topic(nonPersistentTopicName).create().close();

        BundlesData bundlesData = admin.namespaces().getBundles(namespace);
        List<String> boundaries = bundlesData.getBoundaries();
        int topicNum = 0;
        for (int i = 0; i < boundaries.size() - 1; i++) {
            String bundle = String.format("%s_%s", boundaries.get(i), boundaries.get(i + 1));
            List<String> topic = admin.topics().getListInBundle(namespace, bundle);
            if (topic == null) {
                continue;
            }
            topicNum += topic.size();
            for (String s : topic) {
                assertFalse(TopicName.get(s).isPersistent());
            }
        }
        assertEquals(topicNum, 3);
    }

    @Test
    public void testGetTopicsWithDifferentMode() throws Exception {
        final String namespace = newUniqueName(defaultTenant + "/ns");
        admin.namespaces().createNamespace(namespace, Set.of("test"));

        final String persistentTopicName = TopicName
                .get("persistent", NamespaceName.get(namespace), "get_topics_mode_" + UUID.randomUUID().toString())
                .toString();

        final String nonPersistentTopicName = TopicName
                .get("non-persistent", NamespaceName.get(namespace), "get_topics_mode_" + UUID.randomUUID().toString())
                .toString();

        Producer<byte[]> producer1 = pulsarClient.newProducer().topic(persistentTopicName).create();
        Producer<byte[]> producer2 = pulsarClient.newProducer().topic(nonPersistentTopicName).create();

        List<String> topics = new ArrayList<>(admin.topics().getList(namespace));
        assertEquals(topics.size(), 2);
        assertTrue(topics.contains(persistentTopicName));
        assertTrue(topics.contains(nonPersistentTopicName));

        topics.clear();

        topics.addAll(admin.topics().getList(namespace, TopicDomain.persistent));
        assertEquals(topics.size(), 1);
        assertTrue(topics.contains(persistentTopicName));

        topics.clear();

        topics.addAll(admin.topics().getList(namespace, TopicDomain.non_persistent));
        assertEquals(topics.size(), 1);
        assertTrue(topics.contains(nonPersistentTopicName));

        try {
            admin.topics().getList(namespace, TopicDomain.getEnum("none"));
            fail("Should failed with invalid get topic mode.");
        } catch (IllegalArgumentException e) {
            assertEquals(e.getMessage(), "Invalid topic domain: 'none'");
        }

        producer1.close();
        producer2.close();
    }

    @Test(dataProvider = "isV1")
    public void testNonPartitionedTopic(boolean isV1) throws Exception {
        restartClusterAfterTest();
        String tenant = defaultTenant;
        String cluster = "test";
        String namespace = tenant + "/" + (isV1 ? cluster + "/" : "") + "n1" + isV1;
        String topic = "persistent://" + namespace + "/t1" + isV1;
        admin.namespaces().createNamespace(namespace, Set.of(cluster));
        admin.topics().createNonPartitionedTopic(topic);
        assertTrue(admin.topics().getList(namespace).contains(topic));
    }

    /**
     * Validate retring failed partitioned topic should succeed.
     * @throws Exception
     */
    @Test
    public void testFailedUpdatePartitionedTopic() throws Exception {
        final String topicName = "failed-topic";
        final String subName1 = topicName + "-my-sub-1";
        final int startPartitions = 4;
        final int newPartitions = 8;
        final String partitionedTopicName = "persistent://" + defaultNamespace + "/" + topicName;

        URL pulsarUrl = new URL(pulsar.getWebServiceAddress());

        admin.topics().createPartitionedTopic(partitionedTopicName, startPartitions);
        @Cleanup
        PulsarClient client = PulsarClient.builder().serviceUrl(pulsarUrl.toString()).build();
        Consumer<byte[]> consumer1 = client.newConsumer().topic(partitionedTopicName).subscriptionName(subName1)
                .subscriptionType(SubscriptionType.Shared).subscribe();
        consumer1.close();

        // validate partition topic is created
        assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions, startPartitions);

        // create a subscription for few new partition which can fail
        try {
            admin.topics().createSubscription(partitionedTopicName + "-partition-" + startPartitions, subName1,
                    MessageId.earliest);
            fail("Unexpected behaviour");
        } catch (PulsarAdminException.ConflictException ex) {
            // OK
        }

        admin.topics().updatePartitionedTopic(partitionedTopicName, newPartitions, false, true);
        // validate subscription is created for new partition.
        for (int i = startPartitions; i < newPartitions; i++) {
            assertNotNull(
                    admin.topics().getStats(partitionedTopicName + "-partition-" + i).getSubscriptions().get(subName1));
        }

        // validate update partition is success
        assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions, newPartitions);
    }

    /**
     * Validate retring failed partitioned topic should succeed.
     * @throws Exception
     */
    @Test
    public void testTopicStatsWithEarliestTimeInBacklogIfNoBacklog() throws Exception {
        final String topicName = BrokerTestUtil.newUniqueName("persistent://" + defaultNamespace + "/tp_");
        final String subscriptionName = "s1";
        admin.topics().createNonPartitionedTopic(topicName);
        admin.topics().createSubscription(topicName, subscriptionName, MessageId.earliest);

        // Send one message.
        Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topicName).enableBatching(false)
                .create();
        MessageIdImpl messageId = (MessageIdImpl) producer.send("123");
        // Catch up.
        admin.topics().skipAllMessages(topicName, subscriptionName);
        // Get topic stats with earliestTimeInBacklog
        TopicStats topicStats = admin.topics().getStats(topicName, false, false, true);
        assertEquals(topicStats.getSubscriptions().get(subscriptionName).getEarliestMsgPublishTimeInBacklog(), -1L);

        // cleanup.
        producer.close();
        admin.topics().delete(topicName);
    }

    @Test(dataProvider = "topicType")
    public void testPartitionedStatsAggregationByProducerName(String topicType) throws Exception {
        restartClusterIfReused();
        conf.setAggregatePublisherStatsByProducerName(true);
        final String topic = topicType + "://" + defaultNamespace + "/test-partitioned-stats-aggregation-by-producer-name";
        admin.topics().createPartitionedTopic(topic, 10);

        @Cleanup
        Producer<byte[]> producer1 = pulsarClient.newProducer()
                .topic(topic)
                .enableLazyStartPartitionedProducers(true)
                .enableBatching(false)
                .messageRoutingMode(MessageRoutingMode.CustomPartition)
                .messageRouter(new MessageRouter() {
                    @Override
                    public int choosePartition(Message<?> msg, TopicMetadata metadata) {
                        return msg.hasKey() ? Integer.parseInt(msg.getKey()) : 0;
                    }
                })
                .accessMode(ProducerAccessMode.Shared)
                .create();

        @Cleanup
        Producer<byte[]> producer2 = pulsarClient.newProducer()
                .topic(topic)
                .enableLazyStartPartitionedProducers(true)
                .enableBatching(false)
                .messageRoutingMode(MessageRoutingMode.CustomPartition)
                .messageRouter(new MessageRouter() {
                    @Override
                    public int choosePartition(Message<?> msg, TopicMetadata metadata) {
                        return msg.hasKey() ? Integer.parseInt(msg.getKey()) : 5;
                    }
                })
                .accessMode(ProducerAccessMode.Shared)
                .create();

        for (int i = 0; i < 10; i++) {
            producer1.newMessage()
                    .key(String.valueOf(i % 5))
                    .value(("message".getBytes(StandardCharsets.UTF_8)))
                    .send();
            producer2.newMessage()
                    .key(String.valueOf(i % 5 + 5))
                    .value(("message".getBytes(StandardCharsets.UTF_8)))
                    .send();
        }

        PartitionedTopicStats topicStats = admin.topics().getPartitionedStats(topic, true);
        assertEquals(topicStats.getPartitions().size(), 10);
        assertEquals(topicStats.getPartitions().values().stream().mapToInt(e -> e.getPublishers().size()).sum(), 10);
        assertEquals(topicStats.getPartitions().values().stream().map(e -> e.getPublishers().get(0).getProducerName()).distinct().count(), 2);
        assertEquals(topicStats.getPublishers().size(), 2);
        topicStats.getPublishers().forEach(p -> assertTrue(p.isSupportsPartialProducer()));
    }

    @Test(dataProvider = "topicType")
    public void testPartitionedStatsAggregationByProducerNamePerPartition(String topicType) throws Exception {
        restartClusterIfReused();
        conf.setAggregatePublisherStatsByProducerName(true);
        final String topic = topicType + "://" + defaultNamespace + "/test-partitioned-stats-aggregation-by-producer-name-per-pt";
        admin.topics().createPartitionedTopic(topic, 2);

        @Cleanup
        Producer<byte[]> producer1 = pulsarClient.newProducer()
                .topic(topic + TopicName.PARTITIONED_TOPIC_SUFFIX + 0)
                .create();

        @Cleanup
        Producer<byte[]> producer2 = pulsarClient.newProducer()
                .topic(topic + TopicName.PARTITIONED_TOPIC_SUFFIX + 1)
                .create();

        PartitionedTopicStats topicStats = admin.topics().getPartitionedStats(topic, true);
        assertEquals(topicStats.getPartitions().size(), 2);
        assertEquals(topicStats.getPartitions().values().stream().mapToInt(e -> e.getPublishers().size()).sum(), 2);
        assertEquals(topicStats.getPartitions().values().stream().map(e -> e.getPublishers().get(0).getProducerName()).distinct().count(), 2);
        assertEquals(topicStats.getPublishers().size(), 2);
        topicStats.getPublishers().forEach(p -> assertTrue(p.isSupportsPartialProducer()));
    }

    @Test(dataProvider = "topicType")
    public void testSchemaValidationEnforced(String topicType) throws Exception {
        final String topic = topicType + "://" + defaultNamespace + "/test-schema-validation-enforced";
        admin.topics().createPartitionedTopic(topic, 1);
        @Cleanup
        Producer<byte[]> producer1 = pulsarClient.newProducer()
                .topic(topic + TopicName.PARTITIONED_TOPIC_SUFFIX + 0)
                .create();
        boolean schemaValidationEnforced = admin.topics().getSchemaValidationEnforced(topic, false);
        assertEquals(schemaValidationEnforced, false);
        admin.topics().setSchemaValidationEnforced(topic, true);
        Awaitility.await().untilAsserted(() ->
            assertEquals(admin.topics().getSchemaValidationEnforced(topic, false), true)
        );
    }

    @Test
    public void testGetNamespaceTopicList() throws Exception {
        final String persistentTopic = "persistent://" + defaultNamespace + "/testGetNamespaceTopicList";
        final String nonPersistentTopic = "non-persistent://" + defaultNamespace + "/non-testGetNamespaceTopicList";
        final String eventTopic = "persistent://" + defaultNamespace + "/__change_events";
        admin.topics().createNonPartitionedTopic(persistentTopic);
        Awaitility.await().untilAsserted(() ->
                admin.namespaces().getTopics(defaultNamespace,
                ListNamespaceTopicsOptions.builder().mode(Mode.PERSISTENT).includeSystemTopic(true).build())
                        .contains(eventTopic));
        List<String> notIncludeSystemTopics = admin.namespaces().getTopics(defaultNamespace,
                ListNamespaceTopicsOptions.builder().includeSystemTopic(false).build());
        Assert.assertFalse(notIncludeSystemTopics.contains(eventTopic));
        @Cleanup
        Producer<byte[]> producer = pulsarClient.newProducer()
                .topic(nonPersistentTopic)
                .create();
        List<String> notPersistentTopics = admin.namespaces().getTopics(defaultNamespace,
                ListNamespaceTopicsOptions.builder().mode(Mode.NON_PERSISTENT).build());
        Assert.assertTrue(notPersistentTopics.contains(nonPersistentTopic));
    }

    @Test
    private void testTerminateSystemTopic() throws Exception {
        final String topic = "persistent://" + defaultNamespace + "/testTerminateSystemTopic";
        admin.topics().createNonPartitionedTopic(topic);
        final String eventTopic = "persistent://" + defaultNamespace + "/__change_events";
        admin.topicPolicies().setMaxConsumers(topic, 2);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(admin.topicPolicies().getMaxConsumers(topic), Integer.valueOf(2));
        });
        PulsarAdminException ex = expectThrows(PulsarAdminException.class,
                () -> admin.topics().terminateTopic(eventTopic));
        assertTrue(ex instanceof PulsarAdminException.NotAllowedException);
    }

    @Test
    private void testDeleteNamespaceForciblyWithManyTopics() throws Exception {
        final String ns = defaultTenant + "/ns-testDeleteNamespaceForciblyWithManyTopics";
        admin.namespaces().createNamespace(ns, 2);
        for (int i = 0; i < 100; i++) {
            admin.topics().createPartitionedTopic(String.format("persistent://%s", ns + "/topic" + i), 3);
        }
        admin.namespaces().deleteNamespace(ns, true);
        Assert.assertFalse(admin.namespaces().getNamespaces(defaultTenant).contains(ns));
    }

    @Test
    private void testSetBacklogQuotasNamespaceLevelIfRetentionExists() throws Exception {
        final String ns = defaultTenant + "/ns-testSetBacklogQuotasNamespaceLevel";
        final long backlogQuotaLimitSize = 100000002;
        final int backlogQuotaLimitTime = 2;
        admin.namespaces().createNamespace(ns, 2);
        // create retention.
        admin.namespaces().setRetention(ns, new RetentionPolicies(1800, 10000));
        // set backlog quota.
        admin.namespaces().setBacklogQuota(ns, BacklogQuota.builder()
                .limitSize(backlogQuotaLimitSize).limitTime(backlogQuotaLimitTime).build());
        // Verify result.
        Map<BacklogQuota.BacklogQuotaType, BacklogQuota> map = admin.namespaces().getBacklogQuotaMap(ns);
        assertEquals(map.size(), 1);
        assertTrue(map.containsKey(BacklogQuota.BacklogQuotaType.destination_storage));
        BacklogQuota backlogQuota = map.get(BacklogQuota.BacklogQuotaType.destination_storage);
        assertEquals(backlogQuota.getLimitSize(), backlogQuotaLimitSize);
        assertEquals(backlogQuota.getLimitTime(), backlogQuotaLimitTime);
        // cleanup.
        admin.namespaces().deleteNamespace(ns);
    }

    @Test
    private void testAnalyzeSubscriptionBacklogNotCauseStuck() throws Exception {
        final String topic = BrokerTestUtil.newUniqueName("persistent://" + defaultNamespace + "/tp");
        final String subscription = "s1";
        admin.topics().createNonPartitionedTopic(topic);
        // Send 10 messages.
        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(topic).subscriptionName(subscription)
                .receiverQueueSize(0).subscribe();
        Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create();
        for (int i = 0; i < 10; i++) {
            producer.send(i + "");
        }

        // Verify consumer can receive all messages after calling "analyzeSubscriptionBacklog".
        admin.topics().analyzeSubscriptionBacklog(topic, subscription, Optional.of(MessageIdImpl.earliest));
        for (int i = 0; i < 10; i++) {
            Awaitility.await().untilAsserted(() -> {
                Message m = consumer.receive();
                assertNotNull(m);
                consumer.acknowledge(m);
            });
        }

        // cleanup.
        consumer.close();
        producer.close();
        admin.topics().delete(topic);
    }

    @Test
    public void testGetStatsIfPartitionNotExists() throws Exception {
        // create topic.
        final String partitionedTp = BrokerTestUtil.newUniqueName("persistent://" + defaultNamespace + "/tp");
        admin.topics().createPartitionedTopic(partitionedTp, 1);
        TopicName partition0 = TopicName.get(partitionedTp).getPartition(0);
        boolean topicExists1 = pulsar.getBrokerService().getTopic(partition0.toString(), false).join().isPresent();
        assertTrue(topicExists1);
        // Verify topics-stats works.
        TopicStats topicStats = admin.topics().getStats(partition0.toString());
        assertNotNull(topicStats);

        // Delete partition and call topic-stats again.
        admin.topics().delete(partition0.toString());
        boolean topicExists2 = pulsar.getBrokerService().getTopic(partition0.toString(), false).join().isPresent();
        assertFalse(topicExists2);
        // Verify: respond 404.
        try {
            admin.topics().getStats(partition0.toString());
            fail("Should respond 404 after the partition was deleted");
        } catch (Exception ex) {
            assertTrue(ex.getMessage().contains("Topic partitions were not yet created"));
        }

        // cleanup.
        admin.topics().deletePartitionedTopic(partitionedTp);
    }
}
